From 9143ab0b07f49d62e9655fb52bb6e788ab548744 Mon Sep 17 00:00:00 2001 From: Bruno Bornsztein Date: Thu, 5 Feb 2026 07:40:51 -0600 Subject: [PATCH] feat(extensions): add ty-openworkflow for ephemeral compute workflows Add a new TaskYou extension that enables spawning durable workflows on ephemeral compute platforms using the OpenWorkflow architecture. Features: - Pluggable compute adapters (exec, Docker, Cloudflare Workers) - OpenWorkflow-style deterministic replay with step memoization - Durable sleep that survives process restarts - TaskYou integration via CLI bridge - SQLite state management for workflow and run tracking - Webhook server for receiving completion callbacks - Background polling for status updates Compute Adapters: - exec: Local process execution (Node.js/Python) - docker: Isolated container execution - cloudflare: Serverless edge execution on Cloudflare Workers The extension follows the same sidecar pattern as ty-email, running as a separate process that communicates with TaskYou via the ty CLI. Co-Authored-By: Claude Opus 4.5 --- extensions/ty-openworkflow/README.md | 318 +++++++++ extensions/ty-openworkflow/cmd/main.go | 593 +++++++++++++++++ .../ty-openworkflow/config.example.yaml | 52 ++ extensions/ty-openworkflow/go.mod | 26 + extensions/ty-openworkflow/go.sum | 59 ++ .../ty-openworkflow/internal/bridge/bridge.go | 269 ++++++++ .../internal/compute/cloudflare.go | 324 ++++++++++ .../internal/compute/compute.go | 145 +++++ .../internal/compute/docker.go | 610 +++++++++++++++++ .../ty-openworkflow/internal/compute/exec.go | 612 ++++++++++++++++++ .../internal/compute/exec_test.go | 212 ++++++ .../ty-openworkflow/internal/config/config.go | 214 ++++++ .../internal/config/config_test.go | 225 +++++++ .../ty-openworkflow/internal/runner/runner.go | 386 +++++++++++ .../ty-openworkflow/internal/state/state.go | 325 ++++++++++ .../internal/state/state_test.go | 442 +++++++++++++ 16 files changed, 4812 insertions(+) create mode 100644 extensions/ty-openworkflow/README.md create mode 100644 extensions/ty-openworkflow/cmd/main.go create mode 100644 extensions/ty-openworkflow/config.example.yaml create mode 100644 extensions/ty-openworkflow/go.mod create mode 100644 extensions/ty-openworkflow/go.sum create mode 100644 extensions/ty-openworkflow/internal/bridge/bridge.go create mode 100644 extensions/ty-openworkflow/internal/compute/cloudflare.go create mode 100644 extensions/ty-openworkflow/internal/compute/compute.go create mode 100644 extensions/ty-openworkflow/internal/compute/docker.go create mode 100644 extensions/ty-openworkflow/internal/compute/exec.go create mode 100644 extensions/ty-openworkflow/internal/compute/exec_test.go create mode 100644 extensions/ty-openworkflow/internal/config/config.go create mode 100644 extensions/ty-openworkflow/internal/config/config_test.go create mode 100644 extensions/ty-openworkflow/internal/runner/runner.go create mode 100644 extensions/ty-openworkflow/internal/state/state.go create mode 100644 extensions/ty-openworkflow/internal/state/state_test.go diff --git a/extensions/ty-openworkflow/README.md b/extensions/ty-openworkflow/README.md new file mode 100644 index 00000000..78458c0c --- /dev/null +++ b/extensions/ty-openworkflow/README.md @@ -0,0 +1,318 @@ +# ty-openworkflow + +A TaskYou sidecar for spawning workflows on ephemeral compute platforms using the [OpenWorkflow](https://github.com/openworkflowdev/openworkflow) architecture. + +## Overview + +ty-openworkflow enables tasks to spawn durable, fault-tolerant workflows on various compute platforms: + +- **Local exec**: Run workflows as local processes (development/testing) +- **Docker**: Run workflows in isolated containers +- **Cloudflare Workers**: Run workflows on the edge (serverless) + +The extension follows the OpenWorkflow architecture pattern, providing: + +- **Deterministic replay**: Steps are memoized and can be replayed from any point +- **Durable sleep**: Workflows can pause and resume across restarts +- **Fault tolerance**: Failed workers are automatically recovered +- **TaskYou integration**: Workflow runs are linked to tasks for tracking + +## Installation + +```bash +cd extensions/ty-openworkflow +go build -o ty-openworkflow ./cmd +``` + +Or install directly: + +```bash +go install github.com/bborn/workflow/extensions/ty-openworkflow/cmd@latest +``` + +## Quick Start + +1. Initialize configuration: + +```bash +ty-openworkflow init +``` + +2. Deploy a workflow: + +```bash +cat <<'EOF' | ty-openworkflow deploy my-workflow -f - +async function workflow(input, { step, sleep }) { + const greeting = await step("greet", () => { + return `Hello, ${input.name}!`; + }); + + await sleep("wait", 5000); + + const result = await step("complete", () => { + return { message: greeting, timestamp: Date.now() }; + }); + + return result; +} +EOF +``` + +3. Start a workflow run: + +```bash +ty-openworkflow start my-workflow -i '{"name": "World"}' +``` + +4. Check status: + +```bash +ty-openworkflow status +``` + +## Workflow Code + +Workflows are functions that receive input and a context object with `step` and `sleep` helpers: + +### JavaScript/Node.js + +```javascript +async function workflow(input, { step, sleep }) { + // Steps are memoized - safe to replay + const data = await step("fetch-data", async () => { + const res = await fetch(input.url); + return res.json(); + }); + + // Durable sleep - survives restarts + await sleep("wait", 60000); // 1 minute + + // Process the data + const result = await step("process", () => { + return transform(data); + }); + + return result; +} +``` + +### Python + +```python +def workflow(input, ctx): + # Steps are memoized + data = ctx.step("fetch-data", lambda: fetch_data(input["url"])) + + # Durable sleep + ctx.sleep("wait", 60) # 60 seconds + + # Process + result = ctx.step("process", lambda: transform(data)) + + return result +``` + +## Commands + +### Deploy a workflow + +```bash +ty-openworkflow deploy [flags] + +Flags: + -f, --file string Workflow code file (or pipe to stdin) + -a, --adapter string Compute adapter (exec, docker, cloudflare) + --name string Workflow name + --description string Workflow description + --version string Workflow version (default "1.0.0") + --runtime string Runtime (node, python) (default "node") +``` + +### Start a workflow run + +```bash +ty-openworkflow start [flags] + +Flags: + -i, --input string Input JSON + --input-file string Input JSON file + -t, --task string Task title (creates linked task) + --create-task Create a linked task +``` + +### Check run status + +```bash +ty-openworkflow status [flags] + +Flags: + --json Output as JSON +``` + +### List workflows and runs + +```bash +ty-openworkflow list workflows +ty-openworkflow list runs [flags] + +Flags: + -w, --workflow string Filter by workflow ID + -s, --status string Filter by status + -n, --limit int Max results (default 20) +``` + +### Cancel a run + +```bash +ty-openworkflow cancel +``` + +### Start the server + +```bash +ty-openworkflow serve +``` + +Starts the webhook server and background polling for workflow status updates. + +## Configuration + +Configuration file: `~/.config/ty-openworkflow/config.yaml` + +```yaml +data_dir: ~/.config/ty-openworkflow +default_adapter: exec + +adapters: + exec: + enabled: true + work_dir: ~/.config/ty-openworkflow/exec + + docker: + enabled: false + work_dir: ~/.config/ty-openworkflow/docker + network: "" + + cloudflare: + enabled: false + account_id: "" + api_token_cmd: "op read 'op://Private/Cloudflare/api_token'" + namespace: "" + +webhook: + enabled: false + port: 8765 + host: localhost + path: /webhook + external_url: "" + +taskyou: + cli: ty + project: "" + auto_create_tasks: true + +poll_interval: 30s +``` + +## Compute Adapters + +### Local Exec + +Runs workflows as local Node.js or Python processes. Best for development and testing. + +```yaml +adapters: + exec: + enabled: true + work_dir: ~/.config/ty-openworkflow/exec +``` + +### Docker + +Runs workflows in isolated Docker containers. Requires Docker installed. + +```yaml +adapters: + docker: + enabled: true + work_dir: ~/.config/ty-openworkflow/docker + network: my-network # Optional +``` + +### Cloudflare Workers + +Runs workflows on Cloudflare's edge network. Requires: +- Cloudflare account +- API token with Workers permissions +- KV namespace for state + +```yaml +adapters: + cloudflare: + enabled: true + account_id: your-account-id + api_token_cmd: "op read 'op://Private/Cloudflare/api_token'" + namespace: your-kv-namespace +``` + +## TaskYou Integration + +Workflows can be linked to TaskYou tasks: + +```bash +# Create task automatically +ty-openworkflow start my-workflow -i '{"data": "value"}' --create-task + +# Specify task title +ty-openworkflow start my-workflow -t "Process data job" +``` + +When a workflow completes or fails, the linked task is updated automatically. + +## Architecture + +``` + ┌─────────────────────┐ + │ Compute Platform │ + │ (exec/docker/cf) │ + └──────────┬──────────┘ + │ +┌──────────────┐ ┌──────────────┐ ┌──────┴──────┐ +│ TaskYou │◄───│ty-openworkflow│◄───│ Webhook │ +│ (via CLI) │ │ (sidecar) │ │ Callback │ +└──────────────┘ └──────────────┘ └─────────────┘ + ▲ │ + │ │ + └───────────────────┘ + Task Updates + +Components: +- Compute Adapters: Interface to execution platforms +- Runner: Orchestrates workflow deployment and execution +- Bridge: Communicates with TaskYou via CLI +- State: SQLite database for tracking workflows and runs +- Webhook: Receives completion callbacks from compute platforms +``` + +## OpenWorkflow Concepts + +### Deterministic Replay + +Each workflow step is memoized. When a workflow is replayed (e.g., after a crash), completed steps return their cached results without re-executing. + +### Durable Sleep + +The `sleep()` function creates a checkpoint. The workflow can be stopped and resumed later, continuing from where it left off. + +### Step Types + +- `step(name, fn)`: Execute arbitrary code, cache result +- `sleep(name, duration)`: Pause execution durably + +### Fault Tolerance + +Workers heartbeat while executing. If a worker crashes, another worker can pick up the workflow and replay from the last checkpoint. + +## License + +MIT diff --git a/extensions/ty-openworkflow/cmd/main.go b/extensions/ty-openworkflow/cmd/main.go new file mode 100644 index 00000000..d959ada1 --- /dev/null +++ b/extensions/ty-openworkflow/cmd/main.go @@ -0,0 +1,593 @@ +// ty-openworkflow: A sidecar for spawning workflows on ephemeral compute platforms. +package main + +import ( + "context" + "encoding/json" + "fmt" + "io" + "net/http" + "os" + "os/signal" + "syscall" + "time" + + "github.com/bborn/workflow/extensions/ty-openworkflow/internal/bridge" + "github.com/bborn/workflow/extensions/ty-openworkflow/internal/compute" + "github.com/bborn/workflow/extensions/ty-openworkflow/internal/config" + "github.com/bborn/workflow/extensions/ty-openworkflow/internal/runner" + "github.com/bborn/workflow/extensions/ty-openworkflow/internal/state" + "github.com/spf13/cobra" +) + +var ( + cfgFile string + cfg *config.Config +) + +func main() { + rootCmd := &cobra.Command{ + Use: "ty-openworkflow", + Short: "Spawn workflows on ephemeral compute platforms", + Long: `ty-openworkflow is a sidecar for TaskYou that enables spawning +workflows on ephemeral compute platforms like Cloudflare Workers, +Docker containers, or local processes. + +It follows the OpenWorkflow architecture pattern for durable, +fault-tolerant workflow execution with deterministic replay.`, + PersistentPreRunE: func(cmd *cobra.Command, args []string) error { + var err error + path := cfgFile + if path == "" { + path = config.ConfigPath() + } + cfg, err = config.Load(path) + return err + }, + } + + rootCmd.PersistentFlags().StringVar(&cfgFile, "config", "", "config file path") + + // Add commands + rootCmd.AddCommand(serveCmd()) + rootCmd.AddCommand(deployCmd()) + rootCmd.AddCommand(startCmd()) + rootCmd.AddCommand(statusCmd()) + rootCmd.AddCommand(listCmd()) + rootCmd.AddCommand(cancelCmd()) + rootCmd.AddCommand(initCmd()) + rootCmd.AddCommand(adaptersCmd()) + + if err := rootCmd.Execute(); err != nil { + fmt.Fprintln(os.Stderr, err) + os.Exit(1) + } +} + +// serveCmd runs the webhook server and background polling. +func serveCmd() *cobra.Command { + return &cobra.Command{ + Use: "serve", + Short: "Start the webhook server and polling", + Long: `Starts a webhook server to receive workflow completion callbacks and polls for run status updates.`, + RunE: func(cmd *cobra.Command, args []string) error { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + // Handle signals + sigCh := make(chan os.Signal, 1) + signal.Notify(sigCh, syscall.SIGINT, syscall.SIGTERM) + go func() { + <-sigCh + fmt.Println("\nShutting down...") + cancel() + }() + + // Initialize components + r, err := initRunner() + if err != nil { + return err + } + + // Start polling in background + go r.StartPolling(ctx) + + // Start webhook server if enabled + if cfg.Webhook.Enabled { + mux := http.NewServeMux() + mux.HandleFunc(cfg.Webhook.Path, func(w http.ResponseWriter, req *http.Request) { + handleWebhook(r, w, req) + }) + + addr := fmt.Sprintf("%s:%d", cfg.Webhook.Host, cfg.Webhook.Port) + server := &http.Server{Addr: addr, Handler: mux} + + go func() { + fmt.Printf("Webhook server listening on %s\n", addr) + if err := server.ListenAndServe(); err != http.ErrServerClosed { + fmt.Fprintf(os.Stderr, "Server error: %v\n", err) + } + }() + + <-ctx.Done() + shutdownCtx, shutdownCancel := context.WithTimeout(context.Background(), 5*time.Second) + defer shutdownCancel() + server.Shutdown(shutdownCtx) + } else { + fmt.Println("Polling for workflow status updates...") + <-ctx.Done() + } + + return nil + }, + } +} + +func handleWebhook(r *runner.Runner, w http.ResponseWriter, req *http.Request) { + if req.Method != http.MethodPost { + http.Error(w, "Method not allowed", http.StatusMethodNotAllowed) + return + } + + body, err := io.ReadAll(req.Body) + if err != nil { + http.Error(w, "Failed to read body", http.StatusBadRequest) + return + } + + var payload struct { + RunID string `json:"runId"` + Status string `json:"status"` + Output map[string]any `json:"output"` + Error string `json:"error"` + } + + if err := json.Unmarshal(body, &payload); err != nil { + http.Error(w, "Invalid JSON", http.StatusBadRequest) + return + } + + if err := r.HandleWebhook(payload.RunID, payload.Status, payload.Output, payload.Error); err != nil { + fmt.Fprintf(os.Stderr, "Webhook error: %v\n", err) + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + + w.WriteHeader(http.StatusOK) + w.Write([]byte(`{"ok":true}`)) +} + +// deployCmd deploys a workflow. +func deployCmd() *cobra.Command { + var ( + name string + description string + version string + runtime string + adapter string + codeFile string + ) + + cmd := &cobra.Command{ + Use: "deploy ", + Short: "Deploy a workflow to a compute platform", + Args: cobra.ExactArgs(1), + RunE: func(cmd *cobra.Command, args []string) error { + workflowID := args[0] + + // Read workflow code + var code string + if codeFile != "" { + data, err := os.ReadFile(codeFile) + if err != nil { + return fmt.Errorf("read code file: %w", err) + } + code = string(data) + } else { + // Read from stdin + data, err := io.ReadAll(os.Stdin) + if err != nil { + return fmt.Errorf("read stdin: %w", err) + } + code = string(data) + } + + if code == "" { + return fmt.Errorf("workflow code is required (use -f or pipe to stdin)") + } + + if adapter == "" { + adapter = cfg.DefaultAdapter + } + + workflow := &compute.WorkflowDefinition{ + ID: workflowID, + Name: name, + Description: description, + Version: version, + Runtime: runtime, + Code: code, + } + + if workflow.Name == "" { + workflow.Name = workflowID + } + if workflow.Version == "" { + workflow.Version = "1.0.0" + } + if workflow.Runtime == "" { + workflow.Runtime = "node" + } + + r, err := initRunner() + if err != nil { + return err + } + + if err := r.DeployWorkflow(context.Background(), workflow, adapter); err != nil { + return err + } + + fmt.Printf("Deployed workflow %s to %s\n", workflowID, adapter) + return nil + }, + } + + cmd.Flags().StringVar(&name, "name", "", "workflow name") + cmd.Flags().StringVar(&description, "description", "", "workflow description") + cmd.Flags().StringVar(&version, "version", "1.0.0", "workflow version") + cmd.Flags().StringVar(&runtime, "runtime", "node", "workflow runtime (node, python)") + cmd.Flags().StringVarP(&adapter, "adapter", "a", "", "compute adapter (exec, docker, cloudflare)") + cmd.Flags().StringVarP(&codeFile, "file", "f", "", "workflow code file") + + return cmd +} + +// startCmd starts a workflow run. +func startCmd() *cobra.Command { + var ( + inputJSON string + inputFile string + taskTitle string + createTask bool + ) + + cmd := &cobra.Command{ + Use: "start ", + Short: "Start a workflow run", + Args: cobra.ExactArgs(1), + RunE: func(cmd *cobra.Command, args []string) error { + workflowID := args[0] + + // Parse input + var input map[string]any + if inputFile != "" { + data, err := os.ReadFile(inputFile) + if err != nil { + return fmt.Errorf("read input file: %w", err) + } + if err := json.Unmarshal(data, &input); err != nil { + return fmt.Errorf("parse input: %w", err) + } + } else if inputJSON != "" { + if err := json.Unmarshal([]byte(inputJSON), &input); err != nil { + return fmt.Errorf("parse input: %w", err) + } + } else { + input = make(map[string]any) + } + + r, err := initRunner() + if err != nil { + return err + } + + ctx := context.Background() + + if createTask || cfg.TaskYou.AutoCreateTasks { + if taskTitle == "" { + taskTitle = fmt.Sprintf("Workflow: %s", workflowID) + } + + run, task, err := r.StartWorkflowWithTask(ctx, workflowID, input, taskTitle) + if err != nil { + return err + } + + fmt.Printf("Started workflow run %s (task #%d)\n", run.ID, task.ID) + fmt.Printf("Status: %s\n", run.Status) + } else { + run, err := r.StartWorkflow(ctx, workflowID, input, 0) + if err != nil { + return err + } + + fmt.Printf("Started workflow run %s\n", run.ID) + fmt.Printf("Status: %s\n", run.Status) + } + + return nil + }, + } + + cmd.Flags().StringVarP(&inputJSON, "input", "i", "", "input JSON") + cmd.Flags().StringVar(&inputFile, "input-file", "", "input JSON file") + cmd.Flags().StringVarP(&taskTitle, "task", "t", "", "task title (creates linked task)") + cmd.Flags().BoolVar(&createTask, "create-task", false, "create a linked task") + + return cmd +} + +// statusCmd shows the status of a workflow run. +func statusCmd() *cobra.Command { + var jsonOutput bool + + cmd := &cobra.Command{ + Use: "status ", + Short: "Show workflow run status", + Args: cobra.ExactArgs(1), + RunE: func(cmd *cobra.Command, args []string) error { + runID := args[0] + + r, err := initRunner() + if err != nil { + return err + } + + run, err := r.GetRunStatus(context.Background(), runID) + if err != nil { + return err + } + + if jsonOutput { + data, _ := json.MarshalIndent(run, "", " ") + fmt.Println(string(data)) + } else { + fmt.Printf("Run ID: %s\n", run.ID) + fmt.Printf("Workflow: %s\n", run.WorkflowID) + fmt.Printf("Status: %s\n", run.Status) + fmt.Printf("Started: %s\n", run.StartedAt.Format(time.RFC3339)) + if run.CompletedAt != nil { + fmt.Printf("Completed: %s\n", run.CompletedAt.Format(time.RFC3339)) + } + if run.Error != "" { + fmt.Printf("Error: %s\n", run.Error) + } + if run.Output != nil { + output, _ := json.MarshalIndent(run.Output, "", " ") + fmt.Printf("Output:\n%s\n", output) + } + } + + return nil + }, + } + + cmd.Flags().BoolVar(&jsonOutput, "json", false, "output as JSON") + + return cmd +} + +// listCmd lists workflows or runs. +func listCmd() *cobra.Command { + cmd := &cobra.Command{ + Use: "list", + Short: "List workflows or runs", + } + + // List workflows + workflowsCmd := &cobra.Command{ + Use: "workflows", + Short: "List deployed workflows", + RunE: func(cmd *cobra.Command, args []string) error { + r, err := initRunner() + if err != nil { + return err + } + + workflows, err := r.ListWorkflows() + if err != nil { + return err + } + + if len(workflows) == 0 { + fmt.Println("No workflows deployed") + return nil + } + + fmt.Printf("%-20s %-15s %-10s %-10s %s\n", "ID", "NAME", "VERSION", "ADAPTER", "UPDATED") + for _, w := range workflows { + fmt.Printf("%-20s %-15s %-10s %-10s %s\n", + w.ID, w.Name, w.Version, w.Adapter, w.UpdatedAt.Format("2006-01-02 15:04")) + } + + return nil + }, + } + + // List runs + var ( + workflowID string + runStatus string + limit int + ) + + runsCmd := &cobra.Command{ + Use: "runs", + Short: "List workflow runs", + RunE: func(cmd *cobra.Command, args []string) error { + r, err := initRunner() + if err != nil { + return err + } + + runs, err := r.ListRuns(workflowID, runStatus, limit) + if err != nil { + return err + } + + if len(runs) == 0 { + fmt.Println("No runs found") + return nil + } + + fmt.Printf("%-40s %-20s %-10s %-10s %s\n", "ID", "WORKFLOW", "STATUS", "TASK", "STARTED") + for _, run := range runs { + taskStr := "-" + if run.TaskID > 0 { + taskStr = fmt.Sprintf("#%d", run.TaskID) + } + fmt.Printf("%-40s %-20s %-10s %-10s %s\n", + run.ID, run.WorkflowID, run.Status, taskStr, run.StartedAt.Format("2006-01-02 15:04")) + } + + return nil + }, + } + + runsCmd.Flags().StringVarP(&workflowID, "workflow", "w", "", "filter by workflow ID") + runsCmd.Flags().StringVarP(&runStatus, "status", "s", "", "filter by status") + runsCmd.Flags().IntVarP(&limit, "limit", "n", 20, "max results") + + cmd.AddCommand(workflowsCmd, runsCmd) + return cmd +} + +// cancelCmd cancels a workflow run. +func cancelCmd() *cobra.Command { + return &cobra.Command{ + Use: "cancel ", + Short: "Cancel a workflow run", + Args: cobra.ExactArgs(1), + RunE: func(cmd *cobra.Command, args []string) error { + runID := args[0] + + r, err := initRunner() + if err != nil { + return err + } + + if err := r.CancelRun(context.Background(), runID); err != nil { + return err + } + + fmt.Printf("Canceled run %s\n", runID) + return nil + }, + } +} + +// initCmd initializes configuration. +func initCmd() *cobra.Command { + return &cobra.Command{ + Use: "init", + Short: "Initialize configuration", + RunE: func(cmd *cobra.Command, args []string) error { + path := cfgFile + if path == "" { + path = config.ConfigPath() + } + + // Check if config exists + if _, err := os.Stat(path); err == nil { + fmt.Printf("Config already exists at %s\n", path) + return nil + } + + // Create default config + cfg := config.DefaultConfig() + + if err := config.Save(cfg, path); err != nil { + return err + } + + fmt.Printf("Created config at %s\n", path) + fmt.Println("\nEdit this file to configure compute adapters.") + return nil + }, + } +} + +// adaptersCmd shows available adapters. +func adaptersCmd() *cobra.Command { + return &cobra.Command{ + Use: "adapters", + Short: "List available compute adapters", + RunE: func(cmd *cobra.Command, args []string) error { + factory := initAdapters() + + fmt.Println("Available adapters:") + for _, adapter := range factory.All() { + status := "not available" + if adapter.IsAvailable() { + status = "available" + } + fmt.Printf(" %-15s %s\n", adapter.Name(), status) + } + + return nil + }, + } +} + +// initRunner creates and initializes all components. +func initRunner() (*runner.Runner, error) { + // Ensure data directory exists + if err := os.MkdirAll(cfg.DataDir, 0755); err != nil { + return nil, fmt.Errorf("create data dir: %w", err) + } + + // Open state database + stateDB, err := state.Open(cfg.DataDir) + if err != nil { + return nil, fmt.Errorf("open state db: %w", err) + } + + // Initialize adapters + factory := initAdapters() + + // Initialize bridge + br := bridge.NewBridge(cfg.TaskYou.CLI, cfg.TaskYou.Project) + + // Create runner + r := runner.NewRunner(factory, stateDB, br, runner.Config{ + WebhookURL: cfg.WebhookURL(), + PollInterval: cfg.PollInterval, + }) + + return r, nil +} + +// initAdapters creates and registers compute adapters. +func initAdapters() *compute.Factory { + factory := compute.NewFactory() + + // Register exec adapter + if cfg.Adapters.Exec.Enabled { + execAdapter := compute.NewExecAdapter(compute.ExecConfig{ + WorkDir: cfg.Adapters.Exec.WorkDir, + }) + factory.Register(execAdapter) + } + + // Register docker adapter + if cfg.Adapters.Docker.Enabled { + dockerAdapter := compute.NewDockerAdapter(compute.DockerConfig{ + WorkDir: cfg.Adapters.Docker.WorkDir, + Network: cfg.Adapters.Docker.Network, + }) + factory.Register(dockerAdapter) + } + + // Register cloudflare adapter + if cfg.Adapters.Cloudflare.Enabled { + cfAdapter := compute.NewCloudflareAdapter(compute.CloudflareConfig{ + AccountID: cfg.Adapters.Cloudflare.AccountID, + APIToken: cfg.Adapters.Cloudflare.APIToken, + Namespace: cfg.Adapters.Cloudflare.Namespace, + }) + factory.Register(cfAdapter) + } + + return factory +} diff --git a/extensions/ty-openworkflow/config.example.yaml b/extensions/ty-openworkflow/config.example.yaml new file mode 100644 index 00000000..420e3ee8 --- /dev/null +++ b/extensions/ty-openworkflow/config.example.yaml @@ -0,0 +1,52 @@ +# ty-openworkflow configuration +# Copy this to ~/.config/ty-openworkflow/config.yaml + +# Data directory for state and workflow files +data_dir: ~/.config/ty-openworkflow + +# Default compute adapter to use +default_adapter: exec + +# Compute adapter configurations +adapters: + # Local process execution (always available) + exec: + enabled: true + work_dir: ~/.config/ty-openworkflow/exec + + # Docker container execution + docker: + enabled: false + work_dir: ~/.config/ty-openworkflow/docker + network: "" # Optional Docker network + + # Cloudflare Workers (serverless edge) + cloudflare: + enabled: false + account_id: "" + # API token - use api_token_cmd for secure retrieval + api_token: "" + api_token_cmd: "op read 'op://Private/Cloudflare/api_token'" + namespace: "" # KV namespace for workflow state + +# Webhook server for receiving workflow completion callbacks +webhook: + enabled: false + port: 8765 + host: localhost + path: /webhook + # External URL for compute platforms to callback to + # Required if using Cloudflare or other external platforms + external_url: "" + +# TaskYou integration +taskyou: + # Path to ty CLI binary + cli: ty + # Default project for created tasks + project: "" + # Automatically create tasks for workflow runs + auto_create_tasks: true + +# How often to poll for workflow status updates +poll_interval: 30s diff --git a/extensions/ty-openworkflow/go.mod b/extensions/ty-openworkflow/go.mod new file mode 100644 index 00000000..0471d9f4 --- /dev/null +++ b/extensions/ty-openworkflow/go.mod @@ -0,0 +1,26 @@ +module github.com/bborn/workflow/extensions/ty-openworkflow + +go 1.23.0 + +toolchain go1.24.4 + +require ( + github.com/spf13/cobra v1.9.1 + gopkg.in/yaml.v3 v3.0.1 + modernc.org/sqlite v1.37.1 +) + +require ( + github.com/dustin/go-humanize v1.0.1 // indirect + github.com/google/uuid v1.6.0 // indirect + github.com/inconshreveable/mousetrap v1.1.0 // indirect + github.com/mattn/go-isatty v0.0.20 // indirect + github.com/ncruces/go-strftime v0.1.9 // indirect + github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec // indirect + github.com/spf13/pflag v1.0.6 // indirect + golang.org/x/exp v0.0.0-20250408133849-7e4ce0ab07d0 // indirect + golang.org/x/sys v0.33.0 // indirect + modernc.org/libc v1.65.7 // indirect + modernc.org/mathutil v1.7.1 // indirect + modernc.org/memory v1.11.0 // indirect +) diff --git a/extensions/ty-openworkflow/go.sum b/extensions/ty-openworkflow/go.sum new file mode 100644 index 00000000..2f372deb --- /dev/null +++ b/extensions/ty-openworkflow/go.sum @@ -0,0 +1,59 @@ +github.com/cpuguy83/go-md2man/v2 v2.0.6/go.mod h1:oOW0eioCTA6cOiMLiUPZOpcVxMig6NIQQ7OS05n1F4g= +github.com/dustin/go-humanize v1.0.1 h1:GzkhY7T5VNhEkwH0PVJgjz+fX1rhBrR7pRT3mDkpeCY= +github.com/dustin/go-humanize v1.0.1/go.mod h1:Mu1zIs6XwVuF/gI1OepvI0qD18qycQx+mFykh5fBlto= +github.com/google/pprof v0.0.0-20250317173921-a4b03ec1a45e h1:ijClszYn+mADRFY17kjQEVQ1XRhq2/JR1M3sGqeJoxs= +github.com/google/pprof v0.0.0-20250317173921-a4b03ec1a45e/go.mod h1:boTsfXsheKC2y+lKOCMpSfarhxDeIzfZG1jqGcPl3cA= +github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= +github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= +github.com/inconshreveable/mousetrap v1.1.0 h1:wN+x4NVGpMsO7ErUn/mUI3vEoE6Jt13X2s0bqwp9tc8= +github.com/inconshreveable/mousetrap v1.1.0/go.mod h1:vpF70FUmC8bwa3OWnCshd2FqLfsEA9PFc4w1p2J65bw= +github.com/mattn/go-isatty v0.0.20 h1:xfD0iDuEKnDkl03q4limB+vH+GxLEtL/jb4xVJSWWEY= +github.com/mattn/go-isatty v0.0.20/go.mod h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D7dTCTo3Y= +github.com/ncruces/go-strftime v0.1.9 h1:bY0MQC28UADQmHmaF5dgpLmImcShSi2kHU9XLdhx/f4= +github.com/ncruces/go-strftime v0.1.9/go.mod h1:Fwc5htZGVVkseilnfgOVb9mKy6w1naJmn9CehxcKcls= +github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec h1:W09IVJc94icq4NjY3clb7Lk8O1qJ8BdBEF8z0ibU0rE= +github.com/remyoudompheng/bigfft v0.0.0-20230129092748-24d4a6f8daec/go.mod h1:qqbHyh8v60DhA7CoWK5oRCqLrMHRGoxYCSS9EjAz6Eo= +github.com/russross/blackfriday/v2 v2.1.0/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM= +github.com/spf13/cobra v1.9.1 h1:CXSaggrXdbHK9CF+8ywj8Amf7PBRmPCOJugH954Nnlo= +github.com/spf13/cobra v1.9.1/go.mod h1:nDyEzZ8ogv936Cinf6g1RU9MRY64Ir93oCnqb9wxYW0= +github.com/spf13/pflag v1.0.6 h1:jFzHGLGAlb3ruxLB8MhbI6A8+AQX/2eW4qeyNZXNp2o= +github.com/spf13/pflag v1.0.6/go.mod h1:McXfInJRrz4CZXVZOBLb0bTZqETkiAhM9Iw0y3An2Bg= +golang.org/x/exp v0.0.0-20250408133849-7e4ce0ab07d0 h1:R84qjqJb5nVJMxqWYb3np9L5ZsaDtB+a39EqjV0JSUM= +golang.org/x/exp v0.0.0-20250408133849-7e4ce0ab07d0/go.mod h1:S9Xr4PYopiDyqSyp5NjCrhFrqg6A5zA2E/iPHPhqnS8= +golang.org/x/mod v0.24.0 h1:ZfthKaKaT4NrhGVZHO1/WDTwGES4De8KtWO0SIbNJMU= +golang.org/x/mod v0.24.0/go.mod h1:IXM97Txy2VM4PJ3gI61r1YEk/gAj6zAHN3AdZt6S9Ww= +golang.org/x/sync v0.14.0 h1:woo0S4Yywslg6hp4eUFjTVOyKt0RookbpAHG4c1HmhQ= +golang.org/x/sync v0.14.0/go.mod h1:1dzgHSNfp02xaA81J2MS99Qcpr2w7fw1gpm99rleRqA= +golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.33.0 h1:q3i8TbbEz+JRD9ywIRlyRAQbM0qF7hu24q3teo2hbuw= +golang.org/x/sys v0.33.0/go.mod h1:BJP2sWEmIv4KK5OTEluFJCKSidICx8ciO85XgH3Ak8k= +golang.org/x/tools v0.33.0 h1:4qz2S3zmRxbGIhDIAgjxvFutSvH5EfnsYrRBj0UI0bc= +golang.org/x/tools v0.33.0/go.mod h1:CIJMaWEY88juyUfo7UbgPqbC8rU2OqfAV1h2Qp0oMYI= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= +gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= +modernc.org/cc/v4 v4.26.1 h1:+X5NtzVBn0KgsBCBe+xkDC7twLb/jNVj9FPgiwSQO3s= +modernc.org/cc/v4 v4.26.1/go.mod h1:uVtb5OGqUKpoLWhqwNQo/8LwvoiEBLvZXIQ/SmO6mL0= +modernc.org/ccgo/v4 v4.28.0 h1:rjznn6WWehKq7dG4JtLRKxb52Ecv8OUGah8+Z/SfpNU= +modernc.org/ccgo/v4 v4.28.0/go.mod h1:JygV3+9AV6SmPhDasu4JgquwU81XAKLd3OKTUDNOiKE= +modernc.org/fileutil v1.3.1 h1:8vq5fe7jdtEvoCf3Zf9Nm0Q05sH6kGx0Op2CPx1wTC8= +modernc.org/fileutil v1.3.1/go.mod h1:HxmghZSZVAz/LXcMNwZPA/DRrQZEVP9VX0V4LQGQFOc= +modernc.org/gc/v2 v2.6.5 h1:nyqdV8q46KvTpZlsw66kWqwXRHdjIlJOhG6kxiV/9xI= +modernc.org/gc/v2 v2.6.5/go.mod h1:YgIahr1ypgfe7chRuJi2gD7DBQiKSLMPgBQe9oIiito= +modernc.org/libc v1.65.7 h1:Ia9Z4yzZtWNtUIuiPuQ7Qf7kxYrxP1/jeHZzG8bFu00= +modernc.org/libc v1.65.7/go.mod h1:011EQibzzio/VX3ygj1qGFt5kMjP0lHb0qCW5/D/pQU= +modernc.org/mathutil v1.7.1 h1:GCZVGXdaN8gTqB1Mf/usp1Y/hSqgI2vAGGP4jZMCxOU= +modernc.org/mathutil v1.7.1/go.mod h1:4p5IwJITfppl0G4sUEDtCr4DthTaT47/N3aT6MhfgJg= +modernc.org/memory v1.11.0 h1:o4QC8aMQzmcwCK3t3Ux/ZHmwFPzE6hf2Y5LbkRs+hbI= +modernc.org/memory v1.11.0/go.mod h1:/JP4VbVC+K5sU2wZi9bHoq2MAkCnrt2r98UGeSK7Mjw= +modernc.org/opt v0.1.4 h1:2kNGMRiUjrp4LcaPuLY2PzUfqM/w9N23quVwhKt5Qm8= +modernc.org/opt v0.1.4/go.mod h1:03fq9lsNfvkYSfxrfUhZCWPk1lm4cq4N+Bh//bEtgns= +modernc.org/sortutil v1.2.1 h1:+xyoGf15mM3NMlPDnFqrteY07klSFxLElE2PVuWIJ7w= +modernc.org/sortutil v1.2.1/go.mod h1:7ZI3a3REbai7gzCLcotuw9AC4VZVpYMjDzETGsSMqJE= +modernc.org/sqlite v1.37.1 h1:EgHJK/FPoqC+q2YBXg7fUmES37pCHFc97sI7zSayBEs= +modernc.org/sqlite v1.37.1/go.mod h1:XwdRtsE1MpiBcL54+MbKcaDvcuej+IYSMfLN6gSKV8g= +modernc.org/strutil v1.2.1 h1:UneZBkQA+DX2Rp35KcM69cSsNES9ly8mQWD71HKlOA0= +modernc.org/strutil v1.2.1/go.mod h1:EHkiggD70koQxjVdSBM3JKM7k6L0FbGE5eymy9i3B9A= +modernc.org/token v1.1.0 h1:Xl7Ap9dKaEs5kLoOQeQmPWevfnk/DM5qcLcYlA8ys6Y= +modernc.org/token v1.1.0/go.mod h1:UGzOrNV1mAFSEB63lOFHIpNRUVMvYTc6yu1SMY/XTDM= diff --git a/extensions/ty-openworkflow/internal/bridge/bridge.go b/extensions/ty-openworkflow/internal/bridge/bridge.go new file mode 100644 index 00000000..0e879ca3 --- /dev/null +++ b/extensions/ty-openworkflow/internal/bridge/bridge.go @@ -0,0 +1,269 @@ +// Package bridge provides the interface to communicate with TaskYou via CLI. +package bridge + +import ( + "bytes" + "encoding/json" + "fmt" + "os/exec" + "strconv" + "strings" +) + +// Bridge communicates with TaskYou via the ty CLI. +type Bridge struct { + tyPath string + project string +} + +// Task represents a TaskYou task. +type Task struct { + ID int64 `json:"id"` + Title string `json:"title"` + Body string `json:"body"` + Status string `json:"status"` + Type string `json:"type"` + Project string `json:"project"` + CreatedAt string `json:"created_at"` + ExecutingAt string `json:"executing_at,omitempty"` +} + +// NewBridge creates a new TaskYou bridge. +func NewBridge(tyPath, project string) *Bridge { + if tyPath == "" { + tyPath = "ty" + } + return &Bridge{ + tyPath: tyPath, + project: project, + } +} + +// CreateTask creates a new task in TaskYou. +func (b *Bridge) CreateTask(title, body, taskType string) (*Task, error) { + args := []string{"create"} + + if title != "" { + args = append(args, "--title", title) + } + if body != "" { + args = append(args, "--body", body) + } + if taskType != "" { + args = append(args, "--type", taskType) + } + if b.project != "" { + args = append(args, "--project", b.project) + } + + args = append(args, "--json") + + cmd := exec.Command(b.tyPath, args...) + var stdout, stderr bytes.Buffer + cmd.Stdout = &stdout + cmd.Stderr = &stderr + + if err := cmd.Run(); err != nil { + return nil, fmt.Errorf("create task: %s", stderr.String()) + } + + var task Task + if err := json.Unmarshal(stdout.Bytes(), &task); err != nil { + // Try to parse task ID from output + output := strings.TrimSpace(stdout.String()) + if strings.Contains(output, "Created task") { + // Parse "Created task #123" + parts := strings.Split(output, "#") + if len(parts) > 1 { + id, _ := strconv.ParseInt(strings.TrimSpace(parts[1]), 10, 64) + return &Task{ID: id, Title: title, Body: body, Type: taskType}, nil + } + } + return nil, fmt.Errorf("parse task: %w (output: %s)", err, output) + } + + return &task, nil +} + +// GetTask retrieves a task by ID. +func (b *Bridge) GetTask(taskID int64) (*Task, error) { + args := []string{"show", strconv.FormatInt(taskID, 10), "--json"} + + if b.project != "" { + args = append(args, "--project", b.project) + } + + cmd := exec.Command(b.tyPath, args...) + var stdout, stderr bytes.Buffer + cmd.Stdout = &stdout + cmd.Stderr = &stderr + + if err := cmd.Run(); err != nil { + return nil, fmt.Errorf("get task: %s", stderr.String()) + } + + var task Task + if err := json.Unmarshal(stdout.Bytes(), &task); err != nil { + return nil, fmt.Errorf("parse task: %w", err) + } + + return &task, nil +} + +// ListTasks returns tasks, optionally filtered by status. +func (b *Bridge) ListTasks(status string) ([]*Task, error) { + args := []string{"list", "--json"} + + if status != "" { + args = append(args, "--status", status) + } + if b.project != "" { + args = append(args, "--project", b.project) + } + + cmd := exec.Command(b.tyPath, args...) + var stdout, stderr bytes.Buffer + cmd.Stdout = &stdout + cmd.Stderr = &stderr + + if err := cmd.Run(); err != nil { + return nil, fmt.Errorf("list tasks: %s", stderr.String()) + } + + var tasks []*Task + if err := json.Unmarshal(stdout.Bytes(), &tasks); err != nil { + return nil, fmt.Errorf("parse tasks: %w", err) + } + + return tasks, nil +} + +// SendInput sends input to a blocked task. +func (b *Bridge) SendInput(taskID int64, input string) error { + args := []string{"input", strconv.FormatInt(taskID, 10), input} + + if b.project != "" { + args = append(args, "--project", b.project) + } + + cmd := exec.Command(b.tyPath, args...) + var stderr bytes.Buffer + cmd.Stderr = &stderr + + if err := cmd.Run(); err != nil { + return fmt.Errorf("send input: %s", stderr.String()) + } + + return nil +} + +// QueueTask queues a task for execution. +func (b *Bridge) QueueTask(taskID int64) error { + args := []string{"queue", strconv.FormatInt(taskID, 10)} + + if b.project != "" { + args = append(args, "--project", b.project) + } + + cmd := exec.Command(b.tyPath, args...) + var stderr bytes.Buffer + cmd.Stderr = &stderr + + if err := cmd.Run(); err != nil { + return fmt.Errorf("queue task: %s", stderr.String()) + } + + return nil +} + +// CompleteTask marks a task as complete. +func (b *Bridge) CompleteTask(taskID int64, result string) error { + args := []string{"complete", strconv.FormatInt(taskID, 10)} + + if result != "" { + args = append(args, "--message", result) + } + if b.project != "" { + args = append(args, "--project", b.project) + } + + cmd := exec.Command(b.tyPath, args...) + var stderr bytes.Buffer + cmd.Stderr = &stderr + + if err := cmd.Run(); err != nil { + return fmt.Errorf("complete task: %s", stderr.String()) + } + + return nil +} + +// FailTask marks a task as failed. +func (b *Bridge) FailTask(taskID int64, reason string) error { + args := []string{"fail", strconv.FormatInt(taskID, 10)} + + if reason != "" { + args = append(args, "--reason", reason) + } + if b.project != "" { + args = append(args, "--project", b.project) + } + + cmd := exec.Command(b.tyPath, args...) + var stderr bytes.Buffer + cmd.Stderr = &stderr + + if err := cmd.Run(); err != nil { + return fmt.Errorf("fail task: %s", stderr.String()) + } + + return nil +} + +// GetBlockedTasks returns tasks waiting for input. +func (b *Bridge) GetBlockedTasks() ([]*Task, error) { + return b.ListTasks("blocked") +} + +// GetQueuedTasks returns tasks queued for execution. +func (b *Bridge) GetQueuedTasks() ([]*Task, error) { + return b.ListTasks("queued") +} + +// AddTaskComment adds a comment/note to a task. +func (b *Bridge) AddTaskComment(taskID int64, comment string) error { + args := []string{"comment", strconv.FormatInt(taskID, 10), comment} + + if b.project != "" { + args = append(args, "--project", b.project) + } + + cmd := exec.Command(b.tyPath, args...) + var stderr bytes.Buffer + cmd.Stderr = &stderr + + if err := cmd.Run(); err != nil { + return fmt.Errorf("add comment: %s", stderr.String()) + } + + return nil +} + +// UpdateTaskBody updates a task's body/description. +func (b *Bridge) UpdateTaskBody(taskID int64, body string) error { + args := []string{"update", strconv.FormatInt(taskID, 10), "--body", body} + + if b.project != "" { + args = append(args, "--project", b.project) + } + + cmd := exec.Command(b.tyPath, args...) + var stderr bytes.Buffer + cmd.Stderr = &stderr + + if err := cmd.Run(); err != nil { + return fmt.Errorf("update task: %s", stderr.String()) + } + + return nil +} diff --git a/extensions/ty-openworkflow/internal/compute/cloudflare.go b/extensions/ty-openworkflow/internal/compute/cloudflare.go new file mode 100644 index 00000000..e0e67222 --- /dev/null +++ b/extensions/ty-openworkflow/internal/compute/cloudflare.go @@ -0,0 +1,324 @@ +package compute + +import ( + "bytes" + "context" + "encoding/json" + "fmt" + "io" + "net/http" + "time" +) + +// CloudflareAdapter implements compute.Adapter for Cloudflare Workers. +type CloudflareAdapter struct { + accountID string + apiToken string + namespace string // D1 database namespace for state + webhookURL string + client *http.Client + baseURL string +} + +// CloudflareConfig holds configuration for the Cloudflare adapter. +type CloudflareConfig struct { + AccountID string `yaml:"account_id"` + APIToken string `yaml:"api_token"` + Namespace string `yaml:"namespace"` // KV namespace for workflow state +} + +// NewCloudflareAdapter creates a new Cloudflare Workers adapter. +func NewCloudflareAdapter(cfg CloudflareConfig) *CloudflareAdapter { + return &CloudflareAdapter{ + accountID: cfg.AccountID, + apiToken: cfg.APIToken, + namespace: cfg.Namespace, + client: &http.Client{Timeout: 30 * time.Second}, + baseURL: "https://api.cloudflare.com/client/v4", + } +} + +func (c *CloudflareAdapter) Name() string { + return "cloudflare" +} + +func (c *CloudflareAdapter) IsAvailable() bool { + return c.accountID != "" && c.apiToken != "" +} + +func (c *CloudflareAdapter) SetWebhook(url string) { + c.webhookURL = url +} + +// Deploy creates or updates a Cloudflare Worker with the workflow code. +func (c *CloudflareAdapter) Deploy(ctx context.Context, workflow *WorkflowDefinition) error { + // Wrap the workflow code in an OpenWorkflow-compatible worker + workerCode := c.generateWorkerCode(workflow) + + url := fmt.Sprintf("%s/accounts/%s/workers/scripts/ow-%s", c.baseURL, c.accountID, workflow.ID) + + // Create multipart form with worker script + body := bytes.NewBufferString(workerCode) + + req, err := http.NewRequestWithContext(ctx, "PUT", url, body) + if err != nil { + return fmt.Errorf("create request: %w", err) + } + + req.Header.Set("Authorization", "Bearer "+c.apiToken) + req.Header.Set("Content-Type", "application/javascript") + + resp, err := c.client.Do(req) + if err != nil { + return fmt.Errorf("deploy worker: %w", err) + } + defer resp.Body.Close() + + if resp.StatusCode >= 400 { + respBody, _ := io.ReadAll(resp.Body) + return fmt.Errorf("deploy failed (%d): %s", resp.StatusCode, string(respBody)) + } + + return nil +} + +// generateWorkerCode wraps workflow code in a Cloudflare Worker with OpenWorkflow semantics. +func (c *CloudflareAdapter) generateWorkerCode(workflow *WorkflowDefinition) string { + return fmt.Sprintf(` +// OpenWorkflow Worker: %s v%s +// Generated by ty-openworkflow + +const WORKFLOW_ID = "%s"; +const WEBHOOK_URL = "%s"; + +// Step execution with memoization +async function step(name, fn, env) { + const stepKey = WORKFLOW_ID + ":" + env.RUN_ID + ":" + name; + + // Check KV for cached result + const cached = await env.WORKFLOW_STATE.get(stepKey, { type: "json" }); + if (cached && cached.status === "completed") { + return cached.output; + } + + // Execute step + const startedAt = new Date().toISOString(); + try { + const output = await fn(); + await env.WORKFLOW_STATE.put(stepKey, JSON.stringify({ + status: "completed", + output, + startedAt, + completedAt: new Date().toISOString() + })); + return output; + } catch (error) { + await env.WORKFLOW_STATE.put(stepKey, JSON.stringify({ + status: "failed", + error: error.message, + startedAt, + completedAt: new Date().toISOString() + })); + throw error; + } +} + +// Sleep with durable scheduling +async function sleep(name, durationMs, env) { + const stepKey = WORKFLOW_ID + ":" + env.RUN_ID + ":" + name + ":sleep"; + + const cached = await env.WORKFLOW_STATE.get(stepKey, { type: "json" }); + if (cached && cached.status === "completed") { + return; + } + + if (cached && cached.status === "sleeping") { + const wakeAt = new Date(cached.wakeAt); + if (new Date() >= wakeAt) { + await env.WORKFLOW_STATE.put(stepKey, JSON.stringify({ + status: "completed", + completedAt: new Date().toISOString() + })); + return; + } + // Still sleeping - schedule continuation + throw new Error("SLEEPING:" + cached.wakeAt); + } + + // Start sleeping + const wakeAt = new Date(Date.now() + durationMs).toISOString(); + await env.WORKFLOW_STATE.put(stepKey, JSON.stringify({ + status: "sleeping", + wakeAt, + startedAt: new Date().toISOString() + })); + throw new Error("SLEEPING:" + wakeAt); +} + +// User workflow code +%s + +// Worker entry point +export default { + async fetch(request, env) { + const url = new URL(request.url); + + if (request.method === "POST" && url.pathname === "/start") { + const input = await request.json(); + const runID = crypto.randomUUID(); + + // Store run state + await env.WORKFLOW_STATE.put(WORKFLOW_ID + ":" + runID + ":meta", JSON.stringify({ + id: runID, + workflowId: WORKFLOW_ID, + status: "running", + input, + startedAt: new Date().toISOString() + })); + + // Execute workflow + env.RUN_ID = runID; + try { + const output = await workflow(input, { step: (n, f) => step(n, f, env), sleep: (n, d) => sleep(n, d, env) }); + + // Mark completed + await env.WORKFLOW_STATE.put(WORKFLOW_ID + ":" + runID + ":meta", JSON.stringify({ + id: runID, + workflowId: WORKFLOW_ID, + status: "completed", + input, + output, + startedAt: new Date().toISOString(), + completedAt: new Date().toISOString() + })); + + // Call webhook + if (WEBHOOK_URL) { + await fetch(WEBHOOK_URL, { + method: "POST", + headers: { "Content-Type": "application/json" }, + body: JSON.stringify({ runId: runID, status: "completed", output }) + }); + } + + return Response.json({ runId: runID, status: "completed", output }); + } catch (error) { + if (error.message.startsWith("SLEEPING:")) { + const wakeAt = error.message.split(":")[1]; + return Response.json({ runId: runID, status: "sleeping", wakeAt }); + } + + // Mark failed + await env.WORKFLOW_STATE.put(WORKFLOW_ID + ":" + runID + ":meta", JSON.stringify({ + id: runID, + workflowId: WORKFLOW_ID, + status: "failed", + input, + error: error.message, + startedAt: new Date().toISOString(), + completedAt: new Date().toISOString() + })); + + if (WEBHOOK_URL) { + await fetch(WEBHOOK_URL, { + method: "POST", + headers: { "Content-Type": "application/json" }, + body: JSON.stringify({ runId: runID, status: "failed", error: error.message }) + }); + } + + return Response.json({ runId: runID, status: "failed", error: error.message }, { status: 500 }); + } + } + + if (request.method === "GET" && url.pathname.startsWith("/status/")) { + const runID = url.pathname.split("/")[2]; + const meta = await env.WORKFLOW_STATE.get(WORKFLOW_ID + ":" + runID + ":meta", { type: "json" }); + if (!meta) { + return Response.json({ error: "not found" }, { status: 404 }); + } + return Response.json(meta); + } + + return Response.json({ workflow: WORKFLOW_ID, version: "%s" }); + } +}; +`, workflow.Name, workflow.Version, workflow.ID, c.webhookURL, workflow.Code, workflow.Version) +} + +// Start initiates a new workflow run on Cloudflare Workers. +func (c *CloudflareAdapter) Start(ctx context.Context, workflowID string, input map[string]any) (*WorkflowRun, error) { + url := fmt.Sprintf("https://ow-%s.%s.workers.dev/start", workflowID, c.accountID) + + body, err := json.Marshal(input) + if err != nil { + return nil, fmt.Errorf("marshal input: %w", err) + } + + req, err := http.NewRequestWithContext(ctx, "POST", url, bytes.NewReader(body)) + if err != nil { + return nil, fmt.Errorf("create request: %w", err) + } + req.Header.Set("Content-Type", "application/json") + + resp, err := c.client.Do(req) + if err != nil { + return nil, fmt.Errorf("start workflow: %w", err) + } + defer resp.Body.Close() + + var result struct { + RunID string `json:"runId"` + Status string `json:"status"` + Output map[string]any `json:"output,omitempty"` + Error string `json:"error,omitempty"` + } + + if err := json.NewDecoder(resp.Body).Decode(&result); err != nil { + return nil, fmt.Errorf("decode response: %w", err) + } + + run := &WorkflowRun{ + ID: result.RunID, + WorkflowID: workflowID, + Status: RunStatus(result.Status), + Input: input, + Output: result.Output, + Error: result.Error, + StartedAt: time.Now(), + } + + return run, nil +} + +// Status retrieves the current status of a workflow run. +func (c *CloudflareAdapter) Status(ctx context.Context, runID string) (*WorkflowRun, error) { + // We need to know the workflow ID to query - store it in local state + // For now, assume workflow ID is embedded or use a registry + return nil, fmt.Errorf("status check requires workflow ID lookup - use ListRuns instead") +} + +// Cancel attempts to cancel a running workflow. +func (c *CloudflareAdapter) Cancel(ctx context.Context, runID string) error { + // Cloudflare Workers don't have native cancellation + // Mark as cancelled in KV state + return fmt.Errorf("cancel not yet implemented for Cloudflare Workers") +} + +// Logs retrieves execution logs for a workflow run. +func (c *CloudflareAdapter) Logs(ctx context.Context, runID string) ([]string, error) { + // Use Cloudflare Logpush or tail workers for logs + return nil, fmt.Errorf("logs retrieval not yet implemented") +} + +// Cleanup removes completed workflow resources. +func (c *CloudflareAdapter) Cleanup(ctx context.Context, runID string) error { + // Delete KV keys for this run + return nil +} + +// ListRuns returns recent workflow runs. +func (c *CloudflareAdapter) ListRuns(ctx context.Context, workflowID string, limit int) ([]*WorkflowRun, error) { + // Query KV namespace for runs + return nil, fmt.Errorf("list runs not yet implemented") +} diff --git a/extensions/ty-openworkflow/internal/compute/compute.go b/extensions/ty-openworkflow/internal/compute/compute.go new file mode 100644 index 00000000..c8e5e869 --- /dev/null +++ b/extensions/ty-openworkflow/internal/compute/compute.go @@ -0,0 +1,145 @@ +// Package compute provides the interface and implementations for ephemeral compute platforms. +package compute + +import ( + "context" + "time" +) + +// WorkflowRun represents a single workflow execution. +type WorkflowRun struct { + ID string `json:"id"` + WorkflowID string `json:"workflow_id"` + Status RunStatus `json:"status"` + Input map[string]any `json:"input"` + Output map[string]any `json:"output,omitempty"` + Error string `json:"error,omitempty"` + Steps []StepAttempt `json:"steps,omitempty"` + StartedAt time.Time `json:"started_at"` + CompletedAt *time.Time `json:"completed_at,omitempty"` + Metadata map[string]string `json:"metadata,omitempty"` +} + +// RunStatus represents the current state of a workflow run. +type RunStatus string + +const ( + StatusPending RunStatus = "pending" + StatusRunning RunStatus = "running" + StatusSleeping RunStatus = "sleeping" + StatusCompleted RunStatus = "completed" + StatusFailed RunStatus = "failed" + StatusCanceled RunStatus = "canceled" +) + +// StepAttempt records the execution of a single workflow step. +type StepAttempt struct { + ID string `json:"id"` + StepName string `json:"step_name"` + StepType StepType `json:"step_type"` + Status RunStatus `json:"status"` + Input any `json:"input,omitempty"` + Output any `json:"output,omitempty"` + Error string `json:"error,omitempty"` + StartedAt time.Time `json:"started_at"` + CompletedAt *time.Time `json:"completed_at,omitempty"` + RetryCount int `json:"retry_count"` +} + +// StepType indicates what kind of step this is. +type StepType string + +const ( + StepTypeRun StepType = "run" + StepTypeSleep StepType = "sleep" + StepTypeWait StepType = "wait" +) + +// WorkflowDefinition describes a workflow that can be executed. +type WorkflowDefinition struct { + ID string `json:"id"` + Name string `json:"name"` + Description string `json:"description,omitempty"` + Version string `json:"version"` + Code string `json:"code"` // The workflow code to execute + Runtime string `json:"runtime"` // e.g., "node", "python", "go" + Timeout time.Duration `json:"timeout,omitempty"` + MaxRetries int `json:"max_retries,omitempty"` + Metadata map[string]any `json:"metadata,omitempty"` +} + +// Adapter defines the interface for ephemeral compute platforms. +// Implementations include Cloudflare Workers, local exec, Docker, Modal, etc. +type Adapter interface { + // Name returns the adapter identifier. + Name() string + + // IsAvailable checks if the compute platform is configured and accessible. + IsAvailable() bool + + // Deploy uploads a workflow definition to the compute platform. + Deploy(ctx context.Context, workflow *WorkflowDefinition) error + + // Start initiates a new workflow run. + Start(ctx context.Context, workflowID string, input map[string]any) (*WorkflowRun, error) + + // Status retrieves the current status of a workflow run. + Status(ctx context.Context, runID string) (*WorkflowRun, error) + + // Cancel attempts to cancel a running workflow. + Cancel(ctx context.Context, runID string) error + + // Logs retrieves execution logs for a workflow run. + Logs(ctx context.Context, runID string) ([]string, error) + + // Cleanup removes completed workflow resources. + Cleanup(ctx context.Context, runID string) error + + // ListRuns returns recent workflow runs, optionally filtered. + ListRuns(ctx context.Context, workflowID string, limit int) ([]*WorkflowRun, error) + + // SetWebhook configures the callback URL for workflow completion. + SetWebhook(url string) +} + +// Factory creates compute adapters based on configuration. +type Factory struct { + adapters map[string]Adapter +} + +// NewFactory creates a new adapter factory. +func NewFactory() *Factory { + return &Factory{ + adapters: make(map[string]Adapter), + } +} + +// Register adds an adapter to the factory. +func (f *Factory) Register(adapter Adapter) { + f.adapters[adapter.Name()] = adapter +} + +// Get retrieves an adapter by name. +func (f *Factory) Get(name string) Adapter { + return f.adapters[name] +} + +// Available returns the names of all registered adapters. +func (f *Factory) Available() []string { + names := make([]string, 0, len(f.adapters)) + for name, adapter := range f.adapters { + if adapter.IsAvailable() { + names = append(names, name) + } + } + return names +} + +// All returns all registered adapters. +func (f *Factory) All() []Adapter { + adapters := make([]Adapter, 0, len(f.adapters)) + for _, adapter := range f.adapters { + adapters = append(adapters, adapter) + } + return adapters +} diff --git a/extensions/ty-openworkflow/internal/compute/docker.go b/extensions/ty-openworkflow/internal/compute/docker.go new file mode 100644 index 00000000..39015f74 --- /dev/null +++ b/extensions/ty-openworkflow/internal/compute/docker.go @@ -0,0 +1,610 @@ +package compute + +import ( + "bytes" + "context" + "encoding/json" + "fmt" + "os" + "os/exec" + "path/filepath" + "strings" + "sync" + "time" +) + +// DockerAdapter implements compute.Adapter for Docker containers. +type DockerAdapter struct { + workDir string + webhookURL string + network string + runs map[string]*WorkflowRun + mu sync.RWMutex +} + +// DockerConfig holds configuration for the Docker adapter. +type DockerConfig struct { + WorkDir string `yaml:"work_dir"` + Network string `yaml:"network"` // Docker network for containers +} + +// NewDockerAdapter creates a new Docker container adapter. +func NewDockerAdapter(cfg DockerConfig) *DockerAdapter { + workDir := cfg.WorkDir + if workDir == "" { + workDir = filepath.Join(os.TempDir(), "ty-openworkflow-docker") + } + os.MkdirAll(workDir, 0755) + + return &DockerAdapter{ + workDir: workDir, + network: cfg.Network, + runs: make(map[string]*WorkflowRun), + } +} + +func (d *DockerAdapter) Name() string { + return "docker" +} + +func (d *DockerAdapter) IsAvailable() bool { + cmd := exec.Command("docker", "info") + return cmd.Run() == nil +} + +func (d *DockerAdapter) SetWebhook(url string) { + d.webhookURL = url +} + +// Deploy creates a Docker image for the workflow. +func (d *DockerAdapter) Deploy(ctx context.Context, workflow *WorkflowDefinition) error { + // Create workflow directory + workflowDir := filepath.Join(d.workDir, "workflows", workflow.ID) + if err := os.MkdirAll(workflowDir, 0755); err != nil { + return fmt.Errorf("create workflow dir: %w", err) + } + + // Generate Dockerfile and workflow code + dockerfile, code := d.generateDockerfiles(workflow) + + // Write Dockerfile + dockerfilePath := filepath.Join(workflowDir, "Dockerfile") + if err := os.WriteFile(dockerfilePath, []byte(dockerfile), 0644); err != nil { + return fmt.Errorf("write Dockerfile: %w", err) + } + + // Write workflow code + var codePath string + switch workflow.Runtime { + case "node", "javascript": + codePath = filepath.Join(workflowDir, "workflow.js") + case "python": + codePath = filepath.Join(workflowDir, "workflow.py") + default: + codePath = filepath.Join(workflowDir, "workflow.js") + } + + if err := os.WriteFile(codePath, []byte(code), 0644); err != nil { + return fmt.Errorf("write workflow code: %w", err) + } + + // Write metadata + metaPath := filepath.Join(workflowDir, "workflow.json") + meta, _ := json.MarshalIndent(workflow, "", " ") + if err := os.WriteFile(metaPath, meta, 0644); err != nil { + return fmt.Errorf("write metadata: %w", err) + } + + // Build Docker image + imageName := fmt.Sprintf("ow-%s:%s", workflow.ID, workflow.Version) + cmd := exec.CommandContext(ctx, "docker", "build", "-t", imageName, workflowDir) + cmd.Dir = workflowDir + + var stderr bytes.Buffer + cmd.Stderr = &stderr + + if err := cmd.Run(); err != nil { + return fmt.Errorf("build image: %s", stderr.String()) + } + + return nil +} + +// generateDockerfiles creates the Dockerfile and workflow code for the given runtime. +func (d *DockerAdapter) generateDockerfiles(workflow *WorkflowDefinition) (string, string) { + switch workflow.Runtime { + case "node", "javascript": + return d.generateNodeDockerfile(workflow) + case "python": + return d.generatePythonDockerfile(workflow) + default: + return d.generateNodeDockerfile(workflow) + } +} + +func (d *DockerAdapter) generateNodeDockerfile(workflow *WorkflowDefinition) (string, string) { + dockerfile := `FROM node:20-alpine +WORKDIR /app +COPY workflow.js . +CMD ["node", "workflow.js"] +` + + code := fmt.Sprintf(`#!/usr/bin/env node +// OpenWorkflow Docker Runner: %s v%s + +const fs = require('fs'); +const path = require('path'); + +const WORKFLOW_ID = "%s"; +const RUN_ID = process.env.RUN_ID || ''; +const STATE_DIR = process.env.STATE_DIR || '/state'; +const WEBHOOK_URL = process.env.WEBHOOK_URL || ''; + +function getStateFile() { + return path.join(STATE_DIR, RUN_ID + '.json'); +} + +function loadState() { + try { + return JSON.parse(fs.readFileSync(getStateFile(), 'utf8')); + } catch { + return { steps: {} }; + } +} + +function saveState(state) { + fs.mkdirSync(STATE_DIR, { recursive: true }); + fs.writeFileSync(getStateFile(), JSON.stringify(state, null, 2)); +} + +async function step(name, fn) { + const state = loadState(); + if (state.steps[name] && state.steps[name].status === 'completed') { + return state.steps[name].output; + } + + const startedAt = new Date().toISOString(); + try { + const output = await fn(); + state.steps[name] = { + status: 'completed', + output, + startedAt, + completedAt: new Date().toISOString() + }; + saveState(state); + return output; + } catch (error) { + state.steps[name] = { + status: 'failed', + error: error.message, + startedAt, + completedAt: new Date().toISOString() + }; + saveState(state); + throw error; + } +} + +async function sleep(name, durationMs) { + const state = loadState(); + const sleepKey = name + ':sleep'; + + if (state.steps[sleepKey] && state.steps[sleepKey].status === 'completed') { + return; + } + + if (state.steps[sleepKey] && state.steps[sleepKey].status === 'sleeping') { + const wakeAt = new Date(state.steps[sleepKey].wakeAt); + if (new Date() >= wakeAt) { + state.steps[sleepKey].status = 'completed'; + state.steps[sleepKey].completedAt = new Date().toISOString(); + saveState(state); + return; + } + const remaining = wakeAt - new Date(); + await new Promise(resolve => setTimeout(resolve, remaining)); + state.steps[sleepKey].status = 'completed'; + state.steps[sleepKey].completedAt = new Date().toISOString(); + saveState(state); + return; + } + + const wakeAt = new Date(Date.now() + durationMs); + state.steps[sleepKey] = { + status: 'sleeping', + wakeAt: wakeAt.toISOString(), + startedAt: new Date().toISOString() + }; + saveState(state); + + await new Promise(resolve => setTimeout(resolve, durationMs)); + + state.steps[sleepKey].status = 'completed'; + state.steps[sleepKey].completedAt = new Date().toISOString(); + saveState(state); +} + +// User workflow code +%s + +async function main() { + const input = JSON.parse(process.env.INPUT || '{}'); + + try { + const output = await workflow(input, { step, sleep }); + console.log(JSON.stringify({ status: 'completed', output })); + + if (WEBHOOK_URL) { + const https = require('https'); + const http = require('http'); + const url = new URL(WEBHOOK_URL); + const client = url.protocol === 'https:' ? https : http; + const req = client.request(url, { method: 'POST', headers: { 'Content-Type': 'application/json' }}); + req.write(JSON.stringify({ runId: RUN_ID, status: 'completed', output })); + req.end(); + } + } catch (error) { + console.error(JSON.stringify({ status: 'failed', error: error.message })); + process.exit(1); + } +} + +main(); +`, workflow.Name, workflow.Version, workflow.ID, workflow.Code) + + return dockerfile, code +} + +func (d *DockerAdapter) generatePythonDockerfile(workflow *WorkflowDefinition) (string, string) { + dockerfile := `FROM python:3.12-alpine +WORKDIR /app +COPY workflow.py . +CMD ["python3", "workflow.py"] +` + + code := fmt.Sprintf(`#!/usr/bin/env python3 +# OpenWorkflow Docker Runner: %s v%s + +import os +import json +import time +import urllib.request +from datetime import datetime +from pathlib import Path + +WORKFLOW_ID = "%s" +RUN_ID = os.environ.get("RUN_ID", "") +STATE_DIR = Path(os.environ.get("STATE_DIR", "/state")) +WEBHOOK_URL = os.environ.get("WEBHOOK_URL", "") + +def get_state_file(): + return STATE_DIR / f"{RUN_ID}.json" + +def load_state(): + try: + with open(get_state_file()) as f: + return json.load(f) + except: + return {"steps": {}} + +def save_state(state): + STATE_DIR.mkdir(parents=True, exist_ok=True) + with open(get_state_file(), "w") as f: + json.dump(state, f, indent=2) + +def step(name, fn): + state = load_state() + if name in state["steps"] and state["steps"][name].get("status") == "completed": + return state["steps"][name]["output"] + + started_at = datetime.now().isoformat() + try: + output = fn() + state["steps"][name] = { + "status": "completed", + "output": output, + "startedAt": started_at, + "completedAt": datetime.now().isoformat() + } + save_state(state) + return output + except Exception as e: + state["steps"][name] = { + "status": "failed", + "error": str(e), + "startedAt": started_at, + "completedAt": datetime.now().isoformat() + } + save_state(state) + raise + +def sleep_step(name, duration_seconds): + state = load_state() + sleep_key = f"{name}:sleep" + + if sleep_key in state["steps"]: + step_state = state["steps"][sleep_key] + if step_state.get("status") == "completed": + return + if step_state.get("status") == "sleeping": + wake_at = datetime.fromisoformat(step_state["wakeAt"]) + now = datetime.now() + if now >= wake_at: + state["steps"][sleep_key]["status"] = "completed" + state["steps"][sleep_key]["completedAt"] = now.isoformat() + save_state(state) + return + remaining = (wake_at - now).total_seconds() + time.sleep(remaining) + state["steps"][sleep_key]["status"] = "completed" + state["steps"][sleep_key]["completedAt"] = datetime.now().isoformat() + save_state(state) + return + + wake_at = datetime.now().timestamp() + duration_seconds + state["steps"][sleep_key] = { + "status": "sleeping", + "wakeAt": datetime.fromtimestamp(wake_at).isoformat(), + "startedAt": datetime.now().isoformat() + } + save_state(state) + time.sleep(duration_seconds) + state["steps"][sleep_key]["status"] = "completed" + state["steps"][sleep_key]["completedAt"] = datetime.now().isoformat() + save_state(state) + +class WorkflowContext: + def __init__(self): + self.step = step + self.sleep = sleep_step + +# User workflow code +%s + +def main(): + input_data = json.loads(os.environ.get("INPUT", "{}")) + ctx = WorkflowContext() + + try: + output = workflow(input_data, ctx) + print(json.dumps({"status": "completed", "output": output})) + + if WEBHOOK_URL: + req = urllib.request.Request( + WEBHOOK_URL, + data=json.dumps({"runId": RUN_ID, "status": "completed", "output": output}).encode(), + headers={"Content-Type": "application/json"}, + method="POST" + ) + urllib.request.urlopen(req) + except Exception as e: + print(json.dumps({"status": "failed", "error": str(e)})) + exit(1) + +if __name__ == "__main__": + main() +`, workflow.Name, workflow.Version, workflow.ID, workflow.Code) + + return dockerfile, code +} + +// Start initiates a new workflow run in a Docker container. +func (d *DockerAdapter) Start(ctx context.Context, workflowID string, input map[string]any) (*WorkflowRun, error) { + // Generate run ID + runID := fmt.Sprintf("%s-%d", workflowID, time.Now().UnixNano()) + + // Create run directory for state persistence + runDir := filepath.Join(d.workDir, "runs", runID) + if err := os.MkdirAll(runDir, 0755); err != nil { + return nil, fmt.Errorf("create run dir: %w", err) + } + + // Load workflow metadata + workflowDir := filepath.Join(d.workDir, "workflows", workflowID) + metaPath := filepath.Join(workflowDir, "workflow.json") + metaData, err := os.ReadFile(metaPath) + if err != nil { + return nil, fmt.Errorf("read workflow metadata: %w", err) + } + + var workflow WorkflowDefinition + if err := json.Unmarshal(metaData, &workflow); err != nil { + return nil, fmt.Errorf("parse workflow metadata: %w", err) + } + + // Build docker run command + imageName := fmt.Sprintf("ow-%s:%s", workflowID, workflow.Version) + containerName := fmt.Sprintf("ow-run-%s", runID) + inputJSON, _ := json.Marshal(input) + + args := []string{ + "run", "--rm", + "--name", containerName, + "-v", runDir + ":/state", + "-e", "RUN_ID=" + runID, + "-e", "INPUT=" + string(inputJSON), + "-e", "WEBHOOK_URL=" + d.webhookURL, + } + + if d.network != "" { + args = append(args, "--network", d.network) + } + + args = append(args, imageName) + + // Create run record + run := &WorkflowRun{ + ID: runID, + WorkflowID: workflowID, + Status: StatusRunning, + Input: input, + StartedAt: time.Now(), + Metadata: map[string]string{"container": containerName}, + } + + d.mu.Lock() + d.runs[runID] = run + d.mu.Unlock() + + // Start container async + go func() { + cmd := exec.CommandContext(ctx, "docker", args...) + var stdout, stderr bytes.Buffer + cmd.Stdout = &stdout + cmd.Stderr = &stderr + + err := cmd.Run() + + d.mu.Lock() + defer d.mu.Unlock() + + now := time.Now() + run.CompletedAt = &now + + if err != nil { + run.Status = StatusFailed + run.Error = stderr.String() + if run.Error == "" { + run.Error = err.Error() + } + } else { + // Parse output + lines := strings.Split(strings.TrimSpace(stdout.String()), "\n") + if len(lines) > 0 { + lastLine := lines[len(lines)-1] + var result struct { + Status string `json:"status"` + Output map[string]any `json:"output"` + Error string `json:"error"` + } + if err := json.Unmarshal([]byte(lastLine), &result); err == nil { + if result.Status == "completed" { + run.Status = StatusCompleted + run.Output = result.Output + } else { + run.Status = StatusFailed + run.Error = result.Error + } + } else { + run.Status = StatusCompleted + run.Output = map[string]any{"raw": stdout.String()} + } + } + } + + // Write final state + stateData, _ := json.MarshalIndent(run, "", " ") + os.WriteFile(filepath.Join(runDir, "result.json"), stateData, 0644) + }() + + return run, nil +} + +// Status retrieves the current status of a workflow run. +func (d *DockerAdapter) Status(ctx context.Context, runID string) (*WorkflowRun, error) { + d.mu.RLock() + defer d.mu.RUnlock() + + run, ok := d.runs[runID] + if !ok { + // Try to load from disk + runDir := filepath.Join(d.workDir, "runs", runID) + resultPath := filepath.Join(runDir, "result.json") + data, err := os.ReadFile(resultPath) + if err != nil { + return nil, fmt.Errorf("run not found: %s", runID) + } + + var diskRun WorkflowRun + if err := json.Unmarshal(data, &diskRun); err != nil { + return nil, fmt.Errorf("parse run result: %w", err) + } + return &diskRun, nil + } + + return run, nil +} + +// Cancel attempts to stop a running container. +func (d *DockerAdapter) Cancel(ctx context.Context, runID string) error { + d.mu.RLock() + run, ok := d.runs[runID] + d.mu.RUnlock() + + if !ok { + return fmt.Errorf("run not found") + } + + containerName := run.Metadata["container"] + if containerName == "" { + return fmt.Errorf("container name not found") + } + + cmd := exec.CommandContext(ctx, "docker", "stop", containerName) + if err := cmd.Run(); err != nil { + return fmt.Errorf("stop container: %w", err) + } + + d.mu.Lock() + run.Status = StatusCanceled + now := time.Now() + run.CompletedAt = &now + d.mu.Unlock() + + return nil +} + +// Logs retrieves container logs. +func (d *DockerAdapter) Logs(ctx context.Context, runID string) ([]string, error) { + d.mu.RLock() + run, ok := d.runs[runID] + d.mu.RUnlock() + + if !ok { + return nil, fmt.Errorf("run not found") + } + + containerName := run.Metadata["container"] + if containerName == "" { + return nil, fmt.Errorf("container name not found") + } + + cmd := exec.CommandContext(ctx, "docker", "logs", containerName) + var stdout bytes.Buffer + cmd.Stdout = &stdout + + if err := cmd.Run(); err != nil { + return nil, fmt.Errorf("get logs: %w", err) + } + + return strings.Split(stdout.String(), "\n"), nil +} + +// Cleanup removes the run directory. +func (d *DockerAdapter) Cleanup(ctx context.Context, runID string) error { + d.mu.Lock() + delete(d.runs, runID) + d.mu.Unlock() + + runDir := filepath.Join(d.workDir, "runs", runID) + return os.RemoveAll(runDir) +} + +// ListRuns returns recent workflow runs. +func (d *DockerAdapter) ListRuns(ctx context.Context, workflowID string, limit int) ([]*WorkflowRun, error) { + d.mu.RLock() + defer d.mu.RUnlock() + + var runs []*WorkflowRun + for _, run := range d.runs { + if workflowID == "" || run.WorkflowID == workflowID { + runs = append(runs, run) + if limit > 0 && len(runs) >= limit { + break + } + } + } + + return runs, nil +} diff --git a/extensions/ty-openworkflow/internal/compute/exec.go b/extensions/ty-openworkflow/internal/compute/exec.go new file mode 100644 index 00000000..cb200349 --- /dev/null +++ b/extensions/ty-openworkflow/internal/compute/exec.go @@ -0,0 +1,612 @@ +package compute + +import ( + "bytes" + "context" + "encoding/json" + "fmt" + "os" + "os/exec" + "path/filepath" + "sync" + "time" +) + +// ExecAdapter implements compute.Adapter for local process execution. +// Useful for development and testing, or for self-hosted deployments. +type ExecAdapter struct { + workDir string + webhookURL string + runs map[string]*WorkflowRun + processes map[string]*exec.Cmd + mu sync.RWMutex +} + +// ExecConfig holds configuration for the exec adapter. +type ExecConfig struct { + WorkDir string `yaml:"work_dir"` +} + +// NewExecAdapter creates a new local exec adapter. +func NewExecAdapter(cfg ExecConfig) *ExecAdapter { + workDir := cfg.WorkDir + if workDir == "" { + workDir = filepath.Join(os.TempDir(), "ty-openworkflow") + } + os.MkdirAll(workDir, 0755) + + return &ExecAdapter{ + workDir: workDir, + runs: make(map[string]*WorkflowRun), + processes: make(map[string]*exec.Cmd), + } +} + +func (e *ExecAdapter) Name() string { + return "exec" +} + +func (e *ExecAdapter) IsAvailable() bool { + return true // Local exec is always available +} + +func (e *ExecAdapter) SetWebhook(url string) { + e.webhookURL = url +} + +// Deploy stores the workflow definition locally for execution. +func (e *ExecAdapter) Deploy(ctx context.Context, workflow *WorkflowDefinition) error { + // Create workflow directory + workflowDir := filepath.Join(e.workDir, "workflows", workflow.ID) + if err := os.MkdirAll(workflowDir, 0755); err != nil { + return fmt.Errorf("create workflow dir: %w", err) + } + + // Write workflow code + var filename string + switch workflow.Runtime { + case "node", "javascript": + filename = "workflow.js" + case "python": + filename = "workflow.py" + case "go": + filename = "workflow.go" + default: + filename = "workflow.js" + } + + codePath := filepath.Join(workflowDir, filename) + if err := os.WriteFile(codePath, []byte(e.wrapWorkflowCode(workflow)), 0644); err != nil { + return fmt.Errorf("write workflow code: %w", err) + } + + // Write metadata + metaPath := filepath.Join(workflowDir, "workflow.json") + meta, _ := json.MarshalIndent(workflow, "", " ") + if err := os.WriteFile(metaPath, meta, 0644); err != nil { + return fmt.Errorf("write metadata: %w", err) + } + + return nil +} + +// wrapWorkflowCode adds OpenWorkflow runtime support to the user's code. +func (e *ExecAdapter) wrapWorkflowCode(workflow *WorkflowDefinition) string { + switch workflow.Runtime { + case "node", "javascript": + return e.wrapNodeWorkflow(workflow) + case "python": + return e.wrapPythonWorkflow(workflow) + default: + return e.wrapNodeWorkflow(workflow) + } +} + +func (e *ExecAdapter) wrapNodeWorkflow(workflow *WorkflowDefinition) string { + return fmt.Sprintf(`#!/usr/bin/env node +// OpenWorkflow Runner: %s v%s +// Generated by ty-openworkflow + +const fs = require('fs'); +const path = require('path'); + +const WORKFLOW_ID = "%s"; +const RUN_ID = process.env.RUN_ID; +const STATE_DIR = process.env.STATE_DIR || '.'; +const WEBHOOK_URL = process.env.WEBHOOK_URL || ''; + +// State management +function getStateFile() { + return path.join(STATE_DIR, RUN_ID + '.json'); +} + +function loadState() { + try { + return JSON.parse(fs.readFileSync(getStateFile(), 'utf8')); + } catch { + return { steps: {} }; + } +} + +function saveState(state) { + fs.writeFileSync(getStateFile(), JSON.stringify(state, null, 2)); +} + +// Step execution with memoization (deterministic replay) +async function step(name, fn) { + const state = loadState(); + + // Check for cached result + if (state.steps[name] && state.steps[name].status === 'completed') { + return state.steps[name].output; + } + + // Execute step + const startedAt = new Date().toISOString(); + try { + const output = await fn(); + state.steps[name] = { + status: 'completed', + output, + startedAt, + completedAt: new Date().toISOString() + }; + saveState(state); + return output; + } catch (error) { + state.steps[name] = { + status: 'failed', + error: error.message, + startedAt, + completedAt: new Date().toISOString() + }; + saveState(state); + throw error; + } +} + +// Durable sleep +async function sleep(name, durationMs) { + const state = loadState(); + const sleepKey = name + ':sleep'; + + if (state.steps[sleepKey] && state.steps[sleepKey].status === 'completed') { + return; + } + + if (state.steps[sleepKey] && state.steps[sleepKey].status === 'sleeping') { + const wakeAt = new Date(state.steps[sleepKey].wakeAt); + if (new Date() >= wakeAt) { + state.steps[sleepKey].status = 'completed'; + state.steps[sleepKey].completedAt = new Date().toISOString(); + saveState(state); + return; + } + // Still sleeping + const remaining = wakeAt - new Date(); + await new Promise(resolve => setTimeout(resolve, remaining)); + state.steps[sleepKey].status = 'completed'; + state.steps[sleepKey].completedAt = new Date().toISOString(); + saveState(state); + return; + } + + // Start sleeping + const wakeAt = new Date(Date.now() + durationMs); + state.steps[sleepKey] = { + status: 'sleeping', + wakeAt: wakeAt.toISOString(), + startedAt: new Date().toISOString() + }; + saveState(state); + + await new Promise(resolve => setTimeout(resolve, durationMs)); + + state.steps[sleepKey].status = 'completed'; + state.steps[sleepKey].completedAt = new Date().toISOString(); + saveState(state); +} + +// User workflow code +%s + +// Main execution +async function main() { + const input = JSON.parse(process.env.INPUT || '{}'); + + try { + const output = await workflow(input, { step, sleep }); + + // Report completion + console.log(JSON.stringify({ status: 'completed', output })); + + if (WEBHOOK_URL) { + const https = require('https'); + const http = require('http'); + const url = new URL(WEBHOOK_URL); + const client = url.protocol === 'https:' ? https : http; + const req = client.request(url, { + method: 'POST', + headers: { 'Content-Type': 'application/json' } + }); + req.write(JSON.stringify({ runId: RUN_ID, status: 'completed', output })); + req.end(); + } + + process.exit(0); + } catch (error) { + console.error(JSON.stringify({ status: 'failed', error: error.message })); + + if (WEBHOOK_URL) { + const https = require('https'); + const http = require('http'); + const url = new URL(WEBHOOK_URL); + const client = url.protocol === 'https:' ? https : http; + const req = client.request(url, { + method: 'POST', + headers: { 'Content-Type': 'application/json' } + }); + req.write(JSON.stringify({ runId: RUN_ID, status: 'failed', error: error.message })); + req.end(); + } + + process.exit(1); + } +} + +main(); +`, workflow.Name, workflow.Version, workflow.ID, workflow.Code) +} + +func (e *ExecAdapter) wrapPythonWorkflow(workflow *WorkflowDefinition) string { + return fmt.Sprintf(`#!/usr/bin/env python3 +# OpenWorkflow Runner: %s v%s +# Generated by ty-openworkflow + +import os +import json +import time +import urllib.request +from datetime import datetime +from pathlib import Path + +WORKFLOW_ID = "%s" +RUN_ID = os.environ.get("RUN_ID", "") +STATE_DIR = Path(os.environ.get("STATE_DIR", ".")) +WEBHOOK_URL = os.environ.get("WEBHOOK_URL", "") + +def get_state_file(): + return STATE_DIR / f"{RUN_ID}.json" + +def load_state(): + try: + with open(get_state_file()) as f: + return json.load(f) + except: + return {"steps": {}} + +def save_state(state): + with open(get_state_file(), "w") as f: + json.dump(state, f, indent=2) + +def step(name, fn): + """Execute a step with memoization (deterministic replay).""" + state = load_state() + + # Check for cached result + if name in state["steps"] and state["steps"][name].get("status") == "completed": + return state["steps"][name]["output"] + + # Execute step + started_at = datetime.now().isoformat() + try: + output = fn() + state["steps"][name] = { + "status": "completed", + "output": output, + "startedAt": started_at, + "completedAt": datetime.now().isoformat() + } + save_state(state) + return output + except Exception as e: + state["steps"][name] = { + "status": "failed", + "error": str(e), + "startedAt": started_at, + "completedAt": datetime.now().isoformat() + } + save_state(state) + raise + +def sleep_step(name, duration_seconds): + """Durable sleep that survives restarts.""" + state = load_state() + sleep_key = f"{name}:sleep" + + if sleep_key in state["steps"]: + step_state = state["steps"][sleep_key] + if step_state.get("status") == "completed": + return + + if step_state.get("status") == "sleeping": + wake_at = datetime.fromisoformat(step_state["wakeAt"]) + now = datetime.now() + if now >= wake_at: + state["steps"][sleep_key]["status"] = "completed" + state["steps"][sleep_key]["completedAt"] = now.isoformat() + save_state(state) + return + # Wait for remaining time + remaining = (wake_at - now).total_seconds() + time.sleep(remaining) + state["steps"][sleep_key]["status"] = "completed" + state["steps"][sleep_key]["completedAt"] = datetime.now().isoformat() + save_state(state) + return + + # Start sleeping + wake_at = datetime.now().timestamp() + duration_seconds + state["steps"][sleep_key] = { + "status": "sleeping", + "wakeAt": datetime.fromtimestamp(wake_at).isoformat(), + "startedAt": datetime.now().isoformat() + } + save_state(state) + + time.sleep(duration_seconds) + + state["steps"][sleep_key]["status"] = "completed" + state["steps"][sleep_key]["completedAt"] = datetime.now().isoformat() + save_state(state) + +class WorkflowContext: + def __init__(self): + self.step = step + self.sleep = sleep_step + +# User workflow code +%s + +def main(): + input_data = json.loads(os.environ.get("INPUT", "{}")) + ctx = WorkflowContext() + + try: + output = workflow(input_data, ctx) + print(json.dumps({"status": "completed", "output": output})) + + if WEBHOOK_URL: + req = urllib.request.Request( + WEBHOOK_URL, + data=json.dumps({"runId": RUN_ID, "status": "completed", "output": output}).encode(), + headers={"Content-Type": "application/json"}, + method="POST" + ) + urllib.request.urlopen(req) + + except Exception as e: + print(json.dumps({"status": "failed", "error": str(e)})) + + if WEBHOOK_URL: + req = urllib.request.Request( + WEBHOOK_URL, + data=json.dumps({"runId": RUN_ID, "status": "failed", "error": str(e)}).encode(), + headers={"Content-Type": "application/json"}, + method="POST" + ) + urllib.request.urlopen(req) + + exit(1) + +if __name__ == "__main__": + main() +`, workflow.Name, workflow.Version, workflow.ID, workflow.Code) +} + +// Start initiates a new workflow run locally. +func (e *ExecAdapter) Start(ctx context.Context, workflowID string, input map[string]any) (*WorkflowRun, error) { + // Generate run ID + runID := fmt.Sprintf("%s-%d", workflowID, time.Now().UnixNano()) + + // Create run directory + runDir := filepath.Join(e.workDir, "runs", runID) + if err := os.MkdirAll(runDir, 0755); err != nil { + return nil, fmt.Errorf("create run dir: %w", err) + } + + // Load workflow + workflowDir := filepath.Join(e.workDir, "workflows", workflowID) + metaPath := filepath.Join(workflowDir, "workflow.json") + metaData, err := os.ReadFile(metaPath) + if err != nil { + return nil, fmt.Errorf("read workflow metadata: %w", err) + } + + var workflow WorkflowDefinition + if err := json.Unmarshal(metaData, &workflow); err != nil { + return nil, fmt.Errorf("parse workflow metadata: %w", err) + } + + // Determine executable + var codePath string + var cmd *exec.Cmd + + switch workflow.Runtime { + case "node", "javascript": + codePath = filepath.Join(workflowDir, "workflow.js") + cmd = exec.CommandContext(ctx, "node", codePath) + case "python": + codePath = filepath.Join(workflowDir, "workflow.py") + cmd = exec.CommandContext(ctx, "python3", codePath) + default: + codePath = filepath.Join(workflowDir, "workflow.js") + cmd = exec.CommandContext(ctx, "node", codePath) + } + + // Set environment + inputJSON, _ := json.Marshal(input) + cmd.Env = append(os.Environ(), + "RUN_ID="+runID, + "STATE_DIR="+runDir, + "INPUT="+string(inputJSON), + "WEBHOOK_URL="+e.webhookURL, + ) + + // Capture output + var stdout, stderr bytes.Buffer + cmd.Stdout = &stdout + cmd.Stderr = &stderr + + // Create run record + run := &WorkflowRun{ + ID: runID, + WorkflowID: workflowID, + Status: StatusRunning, + Input: input, + StartedAt: time.Now(), + } + + e.mu.Lock() + e.runs[runID] = run + e.processes[runID] = cmd + e.mu.Unlock() + + // Start async execution + go func() { + err := cmd.Run() + + e.mu.Lock() + defer e.mu.Unlock() + + now := time.Now() + run.CompletedAt = &now + + if err != nil { + run.Status = StatusFailed + run.Error = stderr.String() + if run.Error == "" { + run.Error = err.Error() + } + } else { + // Parse output + var result struct { + Status string `json:"status"` + Output map[string]any `json:"output"` + Error string `json:"error"` + } + if err := json.Unmarshal(stdout.Bytes(), &result); err == nil { + if result.Status == "completed" { + run.Status = StatusCompleted + run.Output = result.Output + } else { + run.Status = StatusFailed + run.Error = result.Error + } + } else { + run.Status = StatusCompleted + run.Output = map[string]any{"raw": stdout.String()} + } + } + + // Write final state + stateData, _ := json.MarshalIndent(run, "", " ") + os.WriteFile(filepath.Join(runDir, "result.json"), stateData, 0644) + + delete(e.processes, runID) + }() + + return run, nil +} + +// Status retrieves the current status of a workflow run. +func (e *ExecAdapter) Status(ctx context.Context, runID string) (*WorkflowRun, error) { + e.mu.RLock() + defer e.mu.RUnlock() + + run, ok := e.runs[runID] + if !ok { + // Try to load from disk + runDir := filepath.Join(e.workDir, "runs", runID) + resultPath := filepath.Join(runDir, "result.json") + data, err := os.ReadFile(resultPath) + if err != nil { + return nil, fmt.Errorf("run not found: %s", runID) + } + + var diskRun WorkflowRun + if err := json.Unmarshal(data, &diskRun); err != nil { + return nil, fmt.Errorf("parse run result: %w", err) + } + return &diskRun, nil + } + + return run, nil +} + +// Cancel attempts to cancel a running workflow. +func (e *ExecAdapter) Cancel(ctx context.Context, runID string) error { + e.mu.Lock() + defer e.mu.Unlock() + + cmd, ok := e.processes[runID] + if !ok { + return fmt.Errorf("process not found or already completed") + } + + if cmd.Process != nil { + cmd.Process.Kill() + } + + if run, ok := e.runs[runID]; ok { + run.Status = StatusCanceled + now := time.Now() + run.CompletedAt = &now + } + + return nil +} + +// Logs retrieves execution logs for a workflow run. +func (e *ExecAdapter) Logs(ctx context.Context, runID string) ([]string, error) { + runDir := filepath.Join(e.workDir, "runs", runID) + + // Check for log files + var logs []string + + logPath := filepath.Join(runDir, "output.log") + if data, err := os.ReadFile(logPath); err == nil { + logs = append(logs, string(data)) + } + + return logs, nil +} + +// Cleanup removes completed workflow resources. +func (e *ExecAdapter) Cleanup(ctx context.Context, runID string) error { + e.mu.Lock() + delete(e.runs, runID) + delete(e.processes, runID) + e.mu.Unlock() + + runDir := filepath.Join(e.workDir, "runs", runID) + return os.RemoveAll(runDir) +} + +// ListRuns returns recent workflow runs. +func (e *ExecAdapter) ListRuns(ctx context.Context, workflowID string, limit int) ([]*WorkflowRun, error) { + e.mu.RLock() + defer e.mu.RUnlock() + + var runs []*WorkflowRun + for _, run := range e.runs { + if workflowID == "" || run.WorkflowID == workflowID { + runs = append(runs, run) + if limit > 0 && len(runs) >= limit { + break + } + } + } + + return runs, nil +} diff --git a/extensions/ty-openworkflow/internal/compute/exec_test.go b/extensions/ty-openworkflow/internal/compute/exec_test.go new file mode 100644 index 00000000..bc434c77 --- /dev/null +++ b/extensions/ty-openworkflow/internal/compute/exec_test.go @@ -0,0 +1,212 @@ +package compute + +import ( + "context" + "os" + "path/filepath" + "testing" + "time" +) + +func TestExecAdapter_Name(t *testing.T) { + adapter := NewExecAdapter(ExecConfig{}) + if adapter.Name() != "exec" { + t.Errorf("expected name 'exec', got '%s'", adapter.Name()) + } +} + +func TestExecAdapter_IsAvailable(t *testing.T) { + adapter := NewExecAdapter(ExecConfig{}) + if !adapter.IsAvailable() { + t.Error("exec adapter should always be available") + } +} + +func TestExecAdapter_Deploy(t *testing.T) { + tmpDir := t.TempDir() + adapter := NewExecAdapter(ExecConfig{WorkDir: tmpDir}) + + workflow := &WorkflowDefinition{ + ID: "test-workflow", + Name: "Test Workflow", + Version: "1.0.0", + Runtime: "node", + Code: ` +async function workflow(input, { step }) { + return await step("greet", () => "Hello, " + input.name); +} +`, + } + + err := adapter.Deploy(context.Background(), workflow) + if err != nil { + t.Fatalf("deploy failed: %v", err) + } + + // Check workflow files exist + workflowDir := filepath.Join(tmpDir, "workflows", "test-workflow") + if _, err := os.Stat(filepath.Join(workflowDir, "workflow.js")); os.IsNotExist(err) { + t.Error("workflow.js was not created") + } + if _, err := os.Stat(filepath.Join(workflowDir, "workflow.json")); os.IsNotExist(err) { + t.Error("workflow.json was not created") + } +} + +func TestExecAdapter_DeployPython(t *testing.T) { + tmpDir := t.TempDir() + adapter := NewExecAdapter(ExecConfig{WorkDir: tmpDir}) + + workflow := &WorkflowDefinition{ + ID: "python-workflow", + Name: "Python Workflow", + Version: "1.0.0", + Runtime: "python", + Code: ` +def workflow(input, ctx): + return ctx.step("greet", lambda: "Hello, " + input["name"]) +`, + } + + err := adapter.Deploy(context.Background(), workflow) + if err != nil { + t.Fatalf("deploy failed: %v", err) + } + + // Check workflow files exist + workflowDir := filepath.Join(tmpDir, "workflows", "python-workflow") + if _, err := os.Stat(filepath.Join(workflowDir, "workflow.py")); os.IsNotExist(err) { + t.Error("workflow.py was not created") + } +} + +func TestExecAdapter_StartAndStatus(t *testing.T) { + // Skip if node is not installed + if _, err := os.Stat("/usr/bin/node"); os.IsNotExist(err) { + if _, err := os.Stat("/usr/local/bin/node"); os.IsNotExist(err) { + t.Skip("node not installed") + } + } + + tmpDir := t.TempDir() + adapter := NewExecAdapter(ExecConfig{WorkDir: tmpDir}) + + // Deploy a simple workflow + workflow := &WorkflowDefinition{ + ID: "quick-workflow", + Name: "Quick Workflow", + Version: "1.0.0", + Runtime: "node", + Code: ` +async function workflow(input, { step }) { + return await step("result", () => ({ greeting: "Hello, " + input.name })); +} +`, + } + + err := adapter.Deploy(context.Background(), workflow) + if err != nil { + t.Fatalf("deploy failed: %v", err) + } + + // Start the workflow + input := map[string]any{"name": "World"} + run, err := adapter.Start(context.Background(), "quick-workflow", input) + if err != nil { + t.Fatalf("start failed: %v", err) + } + + if run.ID == "" { + t.Error("run ID should not be empty") + } + if run.WorkflowID != "quick-workflow" { + t.Errorf("expected workflow ID 'quick-workflow', got '%s'", run.WorkflowID) + } + + // Wait for completion + time.Sleep(2 * time.Second) + + // Check status + status, err := adapter.Status(context.Background(), run.ID) + if err != nil { + t.Fatalf("status failed: %v", err) + } + + if status.Status != StatusCompleted { + t.Errorf("expected status 'completed', got '%s'", status.Status) + } + + if status.Output == nil { + t.Error("output should not be nil") + } +} + +func TestExecAdapter_ListRuns(t *testing.T) { + tmpDir := t.TempDir() + adapter := NewExecAdapter(ExecConfig{WorkDir: tmpDir}) + + runs, err := adapter.ListRuns(context.Background(), "", 10) + if err != nil { + t.Fatalf("list runs failed: %v", err) + } + + // Should be empty initially + if len(runs) != 0 { + t.Errorf("expected 0 runs, got %d", len(runs)) + } +} + +func TestWorkflowDefinition(t *testing.T) { + workflow := &WorkflowDefinition{ + ID: "test", + Name: "Test", + Description: "A test workflow", + Version: "1.0.0", + Runtime: "node", + Code: "function workflow() {}", + Timeout: 5 * time.Minute, + MaxRetries: 3, + } + + if workflow.ID != "test" { + t.Errorf("expected ID 'test', got '%s'", workflow.ID) + } + if workflow.Timeout != 5*time.Minute { + t.Errorf("expected timeout 5m, got %v", workflow.Timeout) + } +} + +func TestWorkflowRun(t *testing.T) { + now := time.Now() + run := &WorkflowRun{ + ID: "run-123", + WorkflowID: "workflow-1", + Status: StatusRunning, + Input: map[string]any{"key": "value"}, + StartedAt: now, + } + + if run.Status != StatusRunning { + t.Errorf("expected status running, got %s", run.Status) + } + if run.CompletedAt != nil { + t.Error("completed_at should be nil for running workflow") + } +} + +func TestStepAttempt(t *testing.T) { + now := time.Now() + step := StepAttempt{ + ID: "step-1", + StepName: "fetch-data", + StepType: StepTypeRun, + Status: StatusCompleted, + Input: map[string]any{"url": "https://example.com"}, + Output: map[string]any{"data": "result"}, + StartedAt: now, + } + + if step.StepType != StepTypeRun { + t.Errorf("expected step type 'run', got '%s'", step.StepType) + } +} diff --git a/extensions/ty-openworkflow/internal/config/config.go b/extensions/ty-openworkflow/internal/config/config.go new file mode 100644 index 00000000..4f75003e --- /dev/null +++ b/extensions/ty-openworkflow/internal/config/config.go @@ -0,0 +1,214 @@ +// Package config handles ty-openworkflow configuration. +package config + +import ( + "fmt" + "os" + "os/exec" + "path/filepath" + "strings" + "time" + + "gopkg.in/yaml.v3" +) + +// Config holds the complete configuration. +type Config struct { + // DataDir is where state and workflow files are stored + DataDir string `yaml:"data_dir"` + + // DefaultAdapter is the default compute adapter to use + DefaultAdapter string `yaml:"default_adapter"` + + // Adapters holds configuration for each compute adapter + Adapters AdaptersConfig `yaml:"adapters"` + + // Webhook configuration for receiving callbacks + Webhook WebhookConfig `yaml:"webhook"` + + // TaskYou configuration + TaskYou TaskYouConfig `yaml:"taskyou"` + + // Polling configuration + PollInterval time.Duration `yaml:"poll_interval"` +} + +// AdaptersConfig holds configuration for all adapters. +type AdaptersConfig struct { + Exec ExecAdapterConfig `yaml:"exec"` + Docker DockerAdapterConfig `yaml:"docker"` + Cloudflare CloudflareAdapterConfig `yaml:"cloudflare"` +} + +// ExecAdapterConfig holds local exec adapter configuration. +type ExecAdapterConfig struct { + Enabled bool `yaml:"enabled"` + WorkDir string `yaml:"work_dir"` +} + +// DockerAdapterConfig holds Docker adapter configuration. +type DockerAdapterConfig struct { + Enabled bool `yaml:"enabled"` + WorkDir string `yaml:"work_dir"` + Network string `yaml:"network"` +} + +// CloudflareAdapterConfig holds Cloudflare Workers configuration. +type CloudflareAdapterConfig struct { + Enabled bool `yaml:"enabled"` + AccountID string `yaml:"account_id"` + APIToken string `yaml:"api_token"` + APITokenCmd string `yaml:"api_token_cmd"` // Command to retrieve API token + Namespace string `yaml:"namespace"` // KV namespace for state +} + +// WebhookConfig holds webhook server configuration. +type WebhookConfig struct { + Enabled bool `yaml:"enabled"` + Port int `yaml:"port"` + Host string `yaml:"host"` + Path string `yaml:"path"` + // External URL for compute platforms to callback to + ExternalURL string `yaml:"external_url"` +} + +// TaskYouConfig holds TaskYou integration configuration. +type TaskYouConfig struct { + // Path to ty CLI binary + CLI string `yaml:"cli"` + // Default project for tasks + Project string `yaml:"project"` + // Whether to auto-create tasks for workflow runs + AutoCreateTasks bool `yaml:"auto_create_tasks"` +} + +// DefaultConfig returns the default configuration. +func DefaultConfig() *Config { + homeDir, _ := os.UserHomeDir() + dataDir := filepath.Join(homeDir, ".config", "ty-openworkflow") + + return &Config{ + DataDir: dataDir, + DefaultAdapter: "exec", + Adapters: AdaptersConfig{ + Exec: ExecAdapterConfig{ + Enabled: true, + WorkDir: filepath.Join(dataDir, "exec"), + }, + Docker: DockerAdapterConfig{ + Enabled: false, + WorkDir: filepath.Join(dataDir, "docker"), + }, + Cloudflare: CloudflareAdapterConfig{ + Enabled: false, + }, + }, + Webhook: WebhookConfig{ + Enabled: false, + Port: 8765, + Host: "localhost", + Path: "/webhook", + }, + TaskYou: TaskYouConfig{ + CLI: "ty", + AutoCreateTasks: true, + }, + PollInterval: 30 * time.Second, + } +} + +// ConfigPath returns the default configuration file path. +func ConfigPath() string { + homeDir, _ := os.UserHomeDir() + return filepath.Join(homeDir, ".config", "ty-openworkflow", "config.yaml") +} + +// Load loads configuration from the given path. +func Load(path string) (*Config, error) { + cfg := DefaultConfig() + + data, err := os.ReadFile(path) + if err != nil { + if os.IsNotExist(err) { + return cfg, nil // Use defaults if no config file + } + return nil, fmt.Errorf("read config: %w", err) + } + + if err := yaml.Unmarshal(data, cfg); err != nil { + return nil, fmt.Errorf("parse config: %w", err) + } + + // Resolve API tokens from commands if specified + if cfg.Adapters.Cloudflare.APITokenCmd != "" && cfg.Adapters.Cloudflare.APIToken == "" { + token, err := runCommand(cfg.Adapters.Cloudflare.APITokenCmd) + if err != nil { + return nil, fmt.Errorf("get cloudflare api token: %w", err) + } + cfg.Adapters.Cloudflare.APIToken = token + } + + return cfg, nil +} + +// Save saves configuration to the given path. +func Save(cfg *Config, path string) error { + // Ensure directory exists + dir := filepath.Dir(path) + if err := os.MkdirAll(dir, 0755); err != nil { + return fmt.Errorf("create config dir: %w", err) + } + + data, err := yaml.Marshal(cfg) + if err != nil { + return fmt.Errorf("marshal config: %w", err) + } + + if err := os.WriteFile(path, data, 0600); err != nil { + return fmt.Errorf("write config: %w", err) + } + + return nil +} + +// runCommand executes a shell command and returns its output. +func runCommand(command string) (string, error) { + cmd := exec.Command("sh", "-c", command) + output, err := cmd.Output() + if err != nil { + return "", err + } + return strings.TrimSpace(string(output)), nil +} + +// Validate checks the configuration for errors. +func (c *Config) Validate() error { + // Check that at least one adapter is enabled + hasAdapter := c.Adapters.Exec.Enabled || c.Adapters.Docker.Enabled || c.Adapters.Cloudflare.Enabled + if !hasAdapter { + return fmt.Errorf("at least one compute adapter must be enabled") + } + + // Validate Cloudflare config if enabled + if c.Adapters.Cloudflare.Enabled { + if c.Adapters.Cloudflare.AccountID == "" { + return fmt.Errorf("cloudflare account_id is required when cloudflare adapter is enabled") + } + if c.Adapters.Cloudflare.APIToken == "" && c.Adapters.Cloudflare.APITokenCmd == "" { + return fmt.Errorf("cloudflare api_token or api_token_cmd is required when cloudflare adapter is enabled") + } + } + + return nil +} + +// WebhookURL returns the full webhook URL for compute platforms to call back to. +func (c *Config) WebhookURL() string { + if c.Webhook.ExternalURL != "" { + return c.Webhook.ExternalURL + } + if !c.Webhook.Enabled { + return "" + } + return fmt.Sprintf("http://%s:%d%s", c.Webhook.Host, c.Webhook.Port, c.Webhook.Path) +} diff --git a/extensions/ty-openworkflow/internal/config/config_test.go b/extensions/ty-openworkflow/internal/config/config_test.go new file mode 100644 index 00000000..6689157f --- /dev/null +++ b/extensions/ty-openworkflow/internal/config/config_test.go @@ -0,0 +1,225 @@ +package config + +import ( + "os" + "path/filepath" + "testing" + "time" +) + +func TestDefaultConfig(t *testing.T) { + cfg := DefaultConfig() + + if cfg.DefaultAdapter != "exec" { + t.Errorf("expected default adapter 'exec', got '%s'", cfg.DefaultAdapter) + } + + if !cfg.Adapters.Exec.Enabled { + t.Error("exec adapter should be enabled by default") + } + + if cfg.Adapters.Docker.Enabled { + t.Error("docker adapter should be disabled by default") + } + + if cfg.Adapters.Cloudflare.Enabled { + t.Error("cloudflare adapter should be disabled by default") + } + + if cfg.TaskYou.CLI != "ty" { + t.Errorf("expected ty CLI 'ty', got '%s'", cfg.TaskYou.CLI) + } + + if cfg.PollInterval != 30*time.Second { + t.Errorf("expected poll interval 30s, got %v", cfg.PollInterval) + } +} + +func TestLoadDefault(t *testing.T) { + // Load from non-existent file should return defaults + cfg, err := Load("/non/existent/path/config.yaml") + if err != nil { + t.Fatalf("load failed: %v", err) + } + + if cfg.DefaultAdapter != "exec" { + t.Errorf("expected default adapter 'exec', got '%s'", cfg.DefaultAdapter) + } +} + +func TestSaveAndLoad(t *testing.T) { + tmpDir := t.TempDir() + path := filepath.Join(tmpDir, "config.yaml") + + cfg := DefaultConfig() + cfg.DefaultAdapter = "docker" + cfg.Adapters.Docker.Enabled = true + cfg.Adapters.Docker.Network = "my-network" + cfg.TaskYou.Project = "my-project" + + // Save + if err := Save(cfg, path); err != nil { + t.Fatalf("save failed: %v", err) + } + + // Verify file exists + if _, err := os.Stat(path); os.IsNotExist(err) { + t.Error("config file was not created") + } + + // Load + loaded, err := Load(path) + if err != nil { + t.Fatalf("load failed: %v", err) + } + + if loaded.DefaultAdapter != "docker" { + t.Errorf("expected default adapter 'docker', got '%s'", loaded.DefaultAdapter) + } + if !loaded.Adapters.Docker.Enabled { + t.Error("docker adapter should be enabled") + } + if loaded.Adapters.Docker.Network != "my-network" { + t.Errorf("expected network 'my-network', got '%s'", loaded.Adapters.Docker.Network) + } + if loaded.TaskYou.Project != "my-project" { + t.Errorf("expected project 'my-project', got '%s'", loaded.TaskYou.Project) + } +} + +func TestValidate(t *testing.T) { + tests := []struct { + name string + cfg *Config + expectErr bool + }{ + { + name: "valid default config", + cfg: DefaultConfig(), + expectErr: false, + }, + { + name: "no adapters enabled", + cfg: &Config{ + Adapters: AdaptersConfig{ + Exec: ExecAdapterConfig{Enabled: false}, + Docker: DockerAdapterConfig{Enabled: false}, + Cloudflare: CloudflareAdapterConfig{Enabled: false}, + }, + }, + expectErr: true, + }, + { + name: "cloudflare without account_id", + cfg: &Config{ + Adapters: AdaptersConfig{ + Cloudflare: CloudflareAdapterConfig{ + Enabled: true, + AccountID: "", + APIToken: "token", + }, + }, + }, + expectErr: true, + }, + { + name: "cloudflare without api_token", + cfg: &Config{ + Adapters: AdaptersConfig{ + Cloudflare: CloudflareAdapterConfig{ + Enabled: true, + AccountID: "account", + APIToken: "", + }, + }, + }, + expectErr: true, + }, + { + name: "valid cloudflare config", + cfg: &Config{ + Adapters: AdaptersConfig{ + Cloudflare: CloudflareAdapterConfig{ + Enabled: true, + AccountID: "account", + APIToken: "token", + }, + }, + }, + expectErr: false, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + err := tt.cfg.Validate() + if tt.expectErr && err == nil { + t.Error("expected error but got none") + } + if !tt.expectErr && err != nil { + t.Errorf("unexpected error: %v", err) + } + }) + } +} + +func TestWebhookURL(t *testing.T) { + tests := []struct { + name string + cfg *Config + expected string + }{ + { + name: "webhook disabled", + cfg: &Config{ + Webhook: WebhookConfig{ + Enabled: false, + }, + }, + expected: "", + }, + { + name: "external URL set", + cfg: &Config{ + Webhook: WebhookConfig{ + Enabled: true, + ExternalURL: "https://example.com/webhook", + }, + }, + expected: "https://example.com/webhook", + }, + { + name: "local webhook", + cfg: &Config{ + Webhook: WebhookConfig{ + Enabled: true, + Host: "localhost", + Port: 8765, + Path: "/webhook", + }, + }, + expected: "http://localhost:8765/webhook", + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + url := tt.cfg.WebhookURL() + if url != tt.expected { + t.Errorf("expected URL '%s', got '%s'", tt.expected, url) + } + }) + } +} + +func TestConfigPath(t *testing.T) { + path := ConfigPath() + if path == "" { + t.Error("config path should not be empty") + } + + // Should contain ty-openworkflow + if filepath.Base(filepath.Dir(path)) != "ty-openworkflow" { + t.Errorf("config path should be in ty-openworkflow directory, got %s", path) + } +} diff --git a/extensions/ty-openworkflow/internal/runner/runner.go b/extensions/ty-openworkflow/internal/runner/runner.go new file mode 100644 index 00000000..0d560505 --- /dev/null +++ b/extensions/ty-openworkflow/internal/runner/runner.go @@ -0,0 +1,386 @@ +// Package runner orchestrates workflow execution and TaskYou integration. +package runner + +import ( + "context" + "encoding/json" + "fmt" + "log" + "sync" + "time" + + "github.com/bborn/workflow/extensions/ty-openworkflow/internal/bridge" + "github.com/bborn/workflow/extensions/ty-openworkflow/internal/compute" + "github.com/bborn/workflow/extensions/ty-openworkflow/internal/state" +) + +// Runner manages workflow execution and task integration. +type Runner struct { + factory *compute.Factory + state *state.DB + bridge *bridge.Bridge + webhook string + pollInterval time.Duration + mu sync.RWMutex +} + +// Config holds runner configuration. +type Config struct { + WebhookURL string + PollInterval time.Duration +} + +// NewRunner creates a new workflow runner. +func NewRunner(factory *compute.Factory, stateDB *state.DB, br *bridge.Bridge, cfg Config) *Runner { + interval := cfg.PollInterval + if interval == 0 { + interval = 30 * time.Second + } + + return &Runner{ + factory: factory, + state: stateDB, + bridge: br, + webhook: cfg.WebhookURL, + pollInterval: interval, + } +} + +// DeployWorkflow deploys a workflow to the specified adapter. +func (r *Runner) DeployWorkflow(ctx context.Context, workflow *compute.WorkflowDefinition, adapterName string) error { + adapter := r.factory.Get(adapterName) + if adapter == nil { + return fmt.Errorf("adapter not found: %s", adapterName) + } + + if !adapter.IsAvailable() { + return fmt.Errorf("adapter not available: %s", adapterName) + } + + // Set webhook for callbacks + if r.webhook != "" { + adapter.SetWebhook(r.webhook) + } + + // Deploy to compute platform + if err := adapter.Deploy(ctx, workflow); err != nil { + return fmt.Errorf("deploy workflow: %w", err) + } + + // Save to state + stateWorkflow := &state.Workflow{ + ID: workflow.ID, + Name: workflow.Name, + Description: workflow.Description, + Version: workflow.Version, + Runtime: workflow.Runtime, + Code: workflow.Code, + Adapter: adapterName, + } + + if err := r.state.SaveWorkflow(stateWorkflow); err != nil { + return fmt.Errorf("save workflow state: %w", err) + } + + return nil +} + +// StartWorkflow starts a workflow run, optionally linked to a task. +func (r *Runner) StartWorkflow(ctx context.Context, workflowID string, input map[string]any, taskID int64) (*compute.WorkflowRun, error) { + // Get workflow from state + workflow, err := r.state.GetWorkflow(workflowID) + if err != nil { + return nil, fmt.Errorf("get workflow: %w", err) + } + if workflow == nil { + return nil, fmt.Errorf("workflow not found: %s", workflowID) + } + + // Get adapter + adapter := r.factory.Get(workflow.Adapter) + if adapter == nil { + return nil, fmt.Errorf("adapter not found: %s", workflow.Adapter) + } + + // Set webhook + if r.webhook != "" { + adapter.SetWebhook(r.webhook) + } + + // Start workflow + run, err := adapter.Start(ctx, workflowID, input) + if err != nil { + return nil, fmt.Errorf("start workflow: %w", err) + } + + // Save run to state + inputJSON, _ := json.Marshal(input) + stateRun := &state.WorkflowRun{ + ID: run.ID, + WorkflowID: workflowID, + TaskID: taskID, + Adapter: workflow.Adapter, + Status: string(run.Status), + Input: string(inputJSON), + StartedAt: run.StartedAt, + } + + if err := r.state.SaveRun(stateRun); err != nil { + return nil, fmt.Errorf("save run state: %w", err) + } + + // Link task to workflow if provided + if taskID > 0 { + if err := r.state.LinkTaskToWorkflow(taskID, workflowID, run.ID); err != nil { + log.Printf("Warning: failed to link task to workflow: %v", err) + } + } + + return run, nil +} + +// StartWorkflowWithTask creates a task and starts a workflow linked to it. +func (r *Runner) StartWorkflowWithTask(ctx context.Context, workflowID string, input map[string]any, taskTitle string) (*compute.WorkflowRun, *bridge.Task, error) { + // Create task in TaskYou + body := fmt.Sprintf("Workflow: %s\nInput: %v", workflowID, input) + task, err := r.bridge.CreateTask(taskTitle, body, "code") + if err != nil { + return nil, nil, fmt.Errorf("create task: %w", err) + } + + // Start workflow with task ID + run, err := r.StartWorkflow(ctx, workflowID, input, task.ID) + if err != nil { + return nil, task, fmt.Errorf("start workflow: %w", err) + } + + return run, task, nil +} + +// GetRunStatus returns the current status of a workflow run. +func (r *Runner) GetRunStatus(ctx context.Context, runID string) (*compute.WorkflowRun, error) { + // Get from state + stateRun, err := r.state.GetRun(runID) + if err != nil { + return nil, fmt.Errorf("get run from state: %w", err) + } + if stateRun == nil { + return nil, fmt.Errorf("run not found: %s", runID) + } + + // If terminal state, return from state + if stateRun.Status == "completed" || stateRun.Status == "failed" || stateRun.Status == "canceled" { + var output map[string]any + if stateRun.Output != "" { + json.Unmarshal([]byte(stateRun.Output), &output) + } + var input map[string]any + if stateRun.Input != "" { + json.Unmarshal([]byte(stateRun.Input), &input) + } + + return &compute.WorkflowRun{ + ID: stateRun.ID, + WorkflowID: stateRun.WorkflowID, + Status: compute.RunStatus(stateRun.Status), + Input: input, + Output: output, + Error: stateRun.Error, + StartedAt: stateRun.StartedAt, + CompletedAt: stateRun.CompletedAt, + }, nil + } + + // Get live status from adapter + adapter := r.factory.Get(stateRun.Adapter) + if adapter == nil { + return nil, fmt.Errorf("adapter not found: %s", stateRun.Adapter) + } + + run, err := adapter.Status(ctx, runID) + if err != nil { + // Return state-based status if adapter check fails + var input map[string]any + if stateRun.Input != "" { + json.Unmarshal([]byte(stateRun.Input), &input) + } + return &compute.WorkflowRun{ + ID: stateRun.ID, + WorkflowID: stateRun.WorkflowID, + Status: compute.RunStatus(stateRun.Status), + Input: input, + StartedAt: stateRun.StartedAt, + }, nil + } + + return run, nil +} + +// CancelRun cancels a workflow run. +func (r *Runner) CancelRun(ctx context.Context, runID string) error { + stateRun, err := r.state.GetRun(runID) + if err != nil { + return fmt.Errorf("get run: %w", err) + } + if stateRun == nil { + return fmt.Errorf("run not found: %s", runID) + } + + adapter := r.factory.Get(stateRun.Adapter) + if adapter == nil { + return fmt.Errorf("adapter not found: %s", stateRun.Adapter) + } + + if err := adapter.Cancel(ctx, runID); err != nil { + return fmt.Errorf("cancel run: %w", err) + } + + // Update state + if err := r.state.UpdateRunStatus(runID, "canceled", "", ""); err != nil { + return fmt.Errorf("update run status: %w", err) + } + + // Update linked task if any + if stateRun.TaskID > 0 { + if err := r.bridge.FailTask(stateRun.TaskID, "Workflow canceled"); err != nil { + log.Printf("Warning: failed to update task status: %v", err) + } + } + + return nil +} + +// HandleWebhook processes incoming webhook callbacks from compute platforms. +func (r *Runner) HandleWebhook(runID, status string, output map[string]any, errMsg string) error { + stateRun, err := r.state.GetRun(runID) + if err != nil { + return fmt.Errorf("get run: %w", err) + } + if stateRun == nil { + return fmt.Errorf("run not found: %s", runID) + } + + // Update state + outputJSON := "" + if output != nil { + data, _ := json.Marshal(output) + outputJSON = string(data) + } + + if err := r.state.UpdateRunStatus(runID, status, outputJSON, errMsg); err != nil { + return fmt.Errorf("update run status: %w", err) + } + + // Update linked task + if stateRun.TaskID > 0 { + switch status { + case "completed": + result := "Workflow completed" + if outputJSON != "" { + result = fmt.Sprintf("Workflow completed. Output: %s", outputJSON) + } + if err := r.bridge.CompleteTask(stateRun.TaskID, result); err != nil { + log.Printf("Warning: failed to complete task: %v", err) + } + case "failed": + if err := r.bridge.FailTask(stateRun.TaskID, errMsg); err != nil { + log.Printf("Warning: failed to fail task: %v", err) + } + } + } + + return nil +} + +// ListWorkflows returns all deployed workflows. +func (r *Runner) ListWorkflows() ([]*state.Workflow, error) { + return r.state.ListWorkflows() +} + +// ListRuns returns workflow runs. +func (r *Runner) ListRuns(workflowID, status string, limit int) ([]*state.WorkflowRun, error) { + return r.state.ListRuns(workflowID, status, limit) +} + +// GetWorkflow retrieves a workflow by ID. +func (r *Runner) GetWorkflow(workflowID string) (*state.Workflow, error) { + return r.state.GetWorkflow(workflowID) +} + +// DeleteWorkflow removes a workflow. +func (r *Runner) DeleteWorkflow(workflowID string) error { + return r.state.DeleteWorkflow(workflowID) +} + +// PollPendingRuns checks for and updates pending workflow runs. +func (r *Runner) PollPendingRuns(ctx context.Context) error { + runs, err := r.state.GetPendingRuns() + if err != nil { + return fmt.Errorf("get pending runs: %w", err) + } + + for _, stateRun := range runs { + adapter := r.factory.Get(stateRun.Adapter) + if adapter == nil { + continue + } + + run, err := adapter.Status(ctx, stateRun.ID) + if err != nil { + log.Printf("Warning: failed to get run status for %s: %v", stateRun.ID, err) + continue + } + + // Update if status changed + if string(run.Status) != stateRun.Status { + outputJSON := "" + if run.Output != nil { + data, _ := json.Marshal(run.Output) + outputJSON = string(data) + } + + if err := r.state.UpdateRunStatus(stateRun.ID, string(run.Status), outputJSON, run.Error); err != nil { + log.Printf("Warning: failed to update run status: %v", err) + continue + } + + // Update linked task + if stateRun.TaskID > 0 { + switch run.Status { + case compute.StatusCompleted: + result := "Workflow completed" + if outputJSON != "" { + result = fmt.Sprintf("Workflow completed. Output: %s", outputJSON) + } + r.bridge.CompleteTask(stateRun.TaskID, result) + case compute.StatusFailed: + r.bridge.FailTask(stateRun.TaskID, run.Error) + } + } + } + } + + return nil +} + +// StartPolling begins background polling for run status updates. +func (r *Runner) StartPolling(ctx context.Context) { + ticker := time.NewTicker(r.pollInterval) + defer ticker.Stop() + + for { + select { + case <-ctx.Done(): + return + case <-ticker.C: + if err := r.PollPendingRuns(ctx); err != nil { + log.Printf("Warning: poll pending runs failed: %v", err) + } + } + } +} + +// GetRunForTask retrieves the workflow run associated with a task. +func (r *Runner) GetRunForTask(taskID int64) (*state.WorkflowRun, error) { + return r.state.GetRunByTaskID(taskID) +} diff --git a/extensions/ty-openworkflow/internal/state/state.go b/extensions/ty-openworkflow/internal/state/state.go new file mode 100644 index 00000000..30a707c0 --- /dev/null +++ b/extensions/ty-openworkflow/internal/state/state.go @@ -0,0 +1,325 @@ +// Package state manages persistent state for workflow runs and task associations. +package state + +import ( + "database/sql" + "fmt" + "path/filepath" + "time" + + _ "modernc.org/sqlite" +) + +// DB manages the SQLite state database. +type DB struct { + db *sql.DB +} + +// WorkflowRun represents a workflow run record in the database. +type WorkflowRun struct { + ID string + WorkflowID string + TaskID int64 // Associated TaskYou task + Adapter string + Status string + Input string // JSON + Output string // JSON + Error string + StartedAt time.Time + CompletedAt *time.Time + CreatedAt time.Time + UpdatedAt time.Time +} + +// Workflow represents a deployed workflow definition. +type Workflow struct { + ID string + Name string + Description string + Version string + Runtime string + Code string + Adapter string + CreatedAt time.Time + UpdatedAt time.Time +} + +// Open opens or creates the state database. +func Open(dir string) (*DB, error) { + dbPath := filepath.Join(dir, "state.db") + + db, err := sql.Open("sqlite", dbPath+"?_journal=WAL&_timeout=5000") + if err != nil { + return nil, fmt.Errorf("open database: %w", err) + } + + if err := migrate(db); err != nil { + db.Close() + return nil, fmt.Errorf("migrate: %w", err) + } + + return &DB{db: db}, nil +} + +// migrate creates the database schema. +func migrate(db *sql.DB) error { + schema := ` + CREATE TABLE IF NOT EXISTS workflows ( + id TEXT PRIMARY KEY, + name TEXT NOT NULL, + description TEXT, + version TEXT NOT NULL, + runtime TEXT NOT NULL, + code TEXT NOT NULL, + adapter TEXT NOT NULL, + created_at DATETIME DEFAULT CURRENT_TIMESTAMP, + updated_at DATETIME DEFAULT CURRENT_TIMESTAMP + ); + + CREATE TABLE IF NOT EXISTS workflow_runs ( + id TEXT PRIMARY KEY, + workflow_id TEXT NOT NULL, + task_id INTEGER, + adapter TEXT NOT NULL, + status TEXT NOT NULL DEFAULT 'pending', + input TEXT, + output TEXT, + error TEXT, + started_at DATETIME, + completed_at DATETIME, + created_at DATETIME DEFAULT CURRENT_TIMESTAMP, + updated_at DATETIME DEFAULT CURRENT_TIMESTAMP, + FOREIGN KEY (workflow_id) REFERENCES workflows(id) + ); + + CREATE INDEX IF NOT EXISTS idx_workflow_runs_workflow_id ON workflow_runs(workflow_id); + CREATE INDEX IF NOT EXISTS idx_workflow_runs_task_id ON workflow_runs(task_id); + CREATE INDEX IF NOT EXISTS idx_workflow_runs_status ON workflow_runs(status); + + CREATE TABLE IF NOT EXISTS task_workflow_mapping ( + task_id INTEGER PRIMARY KEY, + workflow_id TEXT NOT NULL, + run_id TEXT, + created_at DATETIME DEFAULT CURRENT_TIMESTAMP, + FOREIGN KEY (workflow_id) REFERENCES workflows(id) + ); + ` + + _, err := db.Exec(schema) + return err +} + +// Close closes the database connection. +func (d *DB) Close() error { + return d.db.Close() +} + +// SaveWorkflow saves or updates a workflow definition. +func (d *DB) SaveWorkflow(w *Workflow) error { + _, err := d.db.Exec(` + INSERT INTO workflows (id, name, description, version, runtime, code, adapter, updated_at) + VALUES (?, ?, ?, ?, ?, ?, ?, CURRENT_TIMESTAMP) + ON CONFLICT(id) DO UPDATE SET + name = excluded.name, + description = excluded.description, + version = excluded.version, + runtime = excluded.runtime, + code = excluded.code, + adapter = excluded.adapter, + updated_at = CURRENT_TIMESTAMP + `, w.ID, w.Name, w.Description, w.Version, w.Runtime, w.Code, w.Adapter) + + return err +} + +// GetWorkflow retrieves a workflow by ID. +func (d *DB) GetWorkflow(id string) (*Workflow, error) { + row := d.db.QueryRow(` + SELECT id, name, description, version, runtime, code, adapter, created_at, updated_at + FROM workflows WHERE id = ? + `, id) + + w := &Workflow{} + err := row.Scan(&w.ID, &w.Name, &w.Description, &w.Version, &w.Runtime, &w.Code, &w.Adapter, &w.CreatedAt, &w.UpdatedAt) + if err == sql.ErrNoRows { + return nil, nil + } + if err != nil { + return nil, err + } + + return w, nil +} + +// ListWorkflows returns all workflows. +func (d *DB) ListWorkflows() ([]*Workflow, error) { + rows, err := d.db.Query(` + SELECT id, name, description, version, runtime, code, adapter, created_at, updated_at + FROM workflows ORDER BY updated_at DESC + `) + if err != nil { + return nil, err + } + defer rows.Close() + + var workflows []*Workflow + for rows.Next() { + w := &Workflow{} + if err := rows.Scan(&w.ID, &w.Name, &w.Description, &w.Version, &w.Runtime, &w.Code, &w.Adapter, &w.CreatedAt, &w.UpdatedAt); err != nil { + return nil, err + } + workflows = append(workflows, w) + } + + return workflows, rows.Err() +} + +// DeleteWorkflow deletes a workflow. +func (d *DB) DeleteWorkflow(id string) error { + _, err := d.db.Exec("DELETE FROM workflows WHERE id = ?", id) + return err +} + +// SaveRun saves a workflow run. +func (d *DB) SaveRun(r *WorkflowRun) error { + _, err := d.db.Exec(` + INSERT INTO workflow_runs (id, workflow_id, task_id, adapter, status, input, output, error, started_at, completed_at, updated_at) + VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, CURRENT_TIMESTAMP) + ON CONFLICT(id) DO UPDATE SET + status = excluded.status, + output = excluded.output, + error = excluded.error, + completed_at = excluded.completed_at, + updated_at = CURRENT_TIMESTAMP + `, r.ID, r.WorkflowID, r.TaskID, r.Adapter, r.Status, r.Input, r.Output, r.Error, r.StartedAt, r.CompletedAt) + + return err +} + +// GetRun retrieves a workflow run by ID. +func (d *DB) GetRun(id string) (*WorkflowRun, error) { + row := d.db.QueryRow(` + SELECT id, workflow_id, task_id, adapter, status, input, output, error, started_at, completed_at, created_at, updated_at + FROM workflow_runs WHERE id = ? + `, id) + + r := &WorkflowRun{} + err := row.Scan(&r.ID, &r.WorkflowID, &r.TaskID, &r.Adapter, &r.Status, &r.Input, &r.Output, &r.Error, &r.StartedAt, &r.CompletedAt, &r.CreatedAt, &r.UpdatedAt) + if err == sql.ErrNoRows { + return nil, nil + } + if err != nil { + return nil, err + } + + return r, nil +} + +// ListRuns returns workflow runs, optionally filtered. +func (d *DB) ListRuns(workflowID string, status string, limit int) ([]*WorkflowRun, error) { + query := "SELECT id, workflow_id, task_id, adapter, status, input, output, error, started_at, completed_at, created_at, updated_at FROM workflow_runs WHERE 1=1" + args := []any{} + + if workflowID != "" { + query += " AND workflow_id = ?" + args = append(args, workflowID) + } + + if status != "" { + query += " AND status = ?" + args = append(args, status) + } + + query += " ORDER BY created_at DESC" + + if limit > 0 { + query += " LIMIT ?" + args = append(args, limit) + } + + rows, err := d.db.Query(query, args...) + if err != nil { + return nil, err + } + defer rows.Close() + + var runs []*WorkflowRun + for rows.Next() { + r := &WorkflowRun{} + if err := rows.Scan(&r.ID, &r.WorkflowID, &r.TaskID, &r.Adapter, &r.Status, &r.Input, &r.Output, &r.Error, &r.StartedAt, &r.CompletedAt, &r.CreatedAt, &r.UpdatedAt); err != nil { + return nil, err + } + runs = append(runs, r) + } + + return runs, rows.Err() +} + +// GetRunByTaskID retrieves a workflow run by its associated task ID. +func (d *DB) GetRunByTaskID(taskID int64) (*WorkflowRun, error) { + row := d.db.QueryRow(` + SELECT id, workflow_id, task_id, adapter, status, input, output, error, started_at, completed_at, created_at, updated_at + FROM workflow_runs WHERE task_id = ? + ORDER BY created_at DESC LIMIT 1 + `, taskID) + + r := &WorkflowRun{} + err := row.Scan(&r.ID, &r.WorkflowID, &r.TaskID, &r.Adapter, &r.Status, &r.Input, &r.Output, &r.Error, &r.StartedAt, &r.CompletedAt, &r.CreatedAt, &r.UpdatedAt) + if err == sql.ErrNoRows { + return nil, nil + } + if err != nil { + return nil, err + } + + return r, nil +} + +// LinkTaskToWorkflow associates a task with a workflow. +func (d *DB) LinkTaskToWorkflow(taskID int64, workflowID, runID string) error { + _, err := d.db.Exec(` + INSERT INTO task_workflow_mapping (task_id, workflow_id, run_id) + VALUES (?, ?, ?) + ON CONFLICT(task_id) DO UPDATE SET + workflow_id = excluded.workflow_id, + run_id = excluded.run_id + `, taskID, workflowID, runID) + + return err +} + +// GetTaskWorkflow retrieves the workflow associated with a task. +func (d *DB) GetTaskWorkflow(taskID int64) (workflowID, runID string, err error) { + row := d.db.QueryRow(` + SELECT workflow_id, run_id FROM task_workflow_mapping WHERE task_id = ? + `, taskID) + + err = row.Scan(&workflowID, &runID) + if err == sql.ErrNoRows { + return "", "", nil + } + + return +} + +// GetPendingRuns returns runs that are still in progress. +func (d *DB) GetPendingRuns() ([]*WorkflowRun, error) { + return d.ListRuns("", "running", 0) +} + +// UpdateRunStatus updates a run's status and optionally output/error. +func (d *DB) UpdateRunStatus(runID, status, output, errMsg string) error { + now := time.Now() + var completedAt *time.Time + if status == "completed" || status == "failed" || status == "canceled" { + completedAt = &now + } + + _, err := d.db.Exec(` + UPDATE workflow_runs + SET status = ?, output = ?, error = ?, completed_at = ?, updated_at = CURRENT_TIMESTAMP + WHERE id = ? + `, status, output, errMsg, completedAt, runID) + + return err +} diff --git a/extensions/ty-openworkflow/internal/state/state_test.go b/extensions/ty-openworkflow/internal/state/state_test.go new file mode 100644 index 00000000..c0a04640 --- /dev/null +++ b/extensions/ty-openworkflow/internal/state/state_test.go @@ -0,0 +1,442 @@ +package state + +import ( + "testing" + "time" +) + +func TestDB_OpenAndClose(t *testing.T) { + tmpDir := t.TempDir() + + db, err := Open(tmpDir) + if err != nil { + t.Fatalf("failed to open database: %v", err) + } + + if err := db.Close(); err != nil { + t.Fatalf("failed to close database: %v", err) + } +} + +func TestDB_SaveAndGetWorkflow(t *testing.T) { + tmpDir := t.TempDir() + db, err := Open(tmpDir) + if err != nil { + t.Fatalf("failed to open database: %v", err) + } + defer db.Close() + + workflow := &Workflow{ + ID: "test-workflow", + Name: "Test Workflow", + Description: "A test workflow", + Version: "1.0.0", + Runtime: "node", + Code: "function workflow() {}", + Adapter: "exec", + } + + // Save workflow + if err := db.SaveWorkflow(workflow); err != nil { + t.Fatalf("failed to save workflow: %v", err) + } + + // Get workflow + retrieved, err := db.GetWorkflow("test-workflow") + if err != nil { + t.Fatalf("failed to get workflow: %v", err) + } + + if retrieved == nil { + t.Fatal("retrieved workflow is nil") + } + if retrieved.ID != workflow.ID { + t.Errorf("expected ID '%s', got '%s'", workflow.ID, retrieved.ID) + } + if retrieved.Name != workflow.Name { + t.Errorf("expected Name '%s', got '%s'", workflow.Name, retrieved.Name) + } + if retrieved.Version != workflow.Version { + t.Errorf("expected Version '%s', got '%s'", workflow.Version, retrieved.Version) + } +} + +func TestDB_ListWorkflows(t *testing.T) { + tmpDir := t.TempDir() + db, err := Open(tmpDir) + if err != nil { + t.Fatalf("failed to open database: %v", err) + } + defer db.Close() + + // Add some workflows + for i := 1; i <= 3; i++ { + workflow := &Workflow{ + ID: "workflow-" + string(rune('0'+i)), + Name: "Workflow " + string(rune('0'+i)), + Version: "1.0.0", + Runtime: "node", + Code: "function workflow() {}", + Adapter: "exec", + } + if err := db.SaveWorkflow(workflow); err != nil { + t.Fatalf("failed to save workflow: %v", err) + } + } + + // List workflows + workflows, err := db.ListWorkflows() + if err != nil { + t.Fatalf("failed to list workflows: %v", err) + } + + if len(workflows) != 3 { + t.Errorf("expected 3 workflows, got %d", len(workflows)) + } +} + +func TestDB_DeleteWorkflow(t *testing.T) { + tmpDir := t.TempDir() + db, err := Open(tmpDir) + if err != nil { + t.Fatalf("failed to open database: %v", err) + } + defer db.Close() + + workflow := &Workflow{ + ID: "to-delete", + Name: "To Delete", + Version: "1.0.0", + Runtime: "node", + Code: "function workflow() {}", + Adapter: "exec", + } + + if err := db.SaveWorkflow(workflow); err != nil { + t.Fatalf("failed to save workflow: %v", err) + } + + if err := db.DeleteWorkflow("to-delete"); err != nil { + t.Fatalf("failed to delete workflow: %v", err) + } + + // Should not exist anymore + retrieved, err := db.GetWorkflow("to-delete") + if err != nil { + t.Fatalf("failed to get workflow: %v", err) + } + if retrieved != nil { + t.Error("workflow should have been deleted") + } +} + +func TestDB_SaveAndGetRun(t *testing.T) { + tmpDir := t.TempDir() + db, err := Open(tmpDir) + if err != nil { + t.Fatalf("failed to open database: %v", err) + } + defer db.Close() + + // First save a workflow + workflow := &Workflow{ + ID: "workflow-1", + Name: "Workflow 1", + Version: "1.0.0", + Runtime: "node", + Code: "function workflow() {}", + Adapter: "exec", + } + if err := db.SaveWorkflow(workflow); err != nil { + t.Fatalf("failed to save workflow: %v", err) + } + + // Save a run + run := &WorkflowRun{ + ID: "run-123", + WorkflowID: "workflow-1", + TaskID: 42, + Adapter: "exec", + Status: "running", + Input: `{"key":"value"}`, + StartedAt: time.Now(), + } + + if err := db.SaveRun(run); err != nil { + t.Fatalf("failed to save run: %v", err) + } + + // Get run + retrieved, err := db.GetRun("run-123") + if err != nil { + t.Fatalf("failed to get run: %v", err) + } + + if retrieved == nil { + t.Fatal("retrieved run is nil") + } + if retrieved.ID != run.ID { + t.Errorf("expected ID '%s', got '%s'", run.ID, retrieved.ID) + } + if retrieved.TaskID != run.TaskID { + t.Errorf("expected TaskID %d, got %d", run.TaskID, retrieved.TaskID) + } + if retrieved.Status != run.Status { + t.Errorf("expected Status '%s', got '%s'", run.Status, retrieved.Status) + } +} + +func TestDB_ListRuns(t *testing.T) { + tmpDir := t.TempDir() + db, err := Open(tmpDir) + if err != nil { + t.Fatalf("failed to open database: %v", err) + } + defer db.Close() + + // Save a workflow + workflow := &Workflow{ + ID: "workflow-1", + Name: "Workflow 1", + Version: "1.0.0", + Runtime: "node", + Code: "function workflow() {}", + Adapter: "exec", + } + if err := db.SaveWorkflow(workflow); err != nil { + t.Fatalf("failed to save workflow: %v", err) + } + + // Save multiple runs + statuses := []string{"running", "completed", "running"} + for i, status := range statuses { + run := &WorkflowRun{ + ID: "run-" + string(rune('0'+i+1)), + WorkflowID: "workflow-1", + Adapter: "exec", + Status: status, + StartedAt: time.Now(), + } + if err := db.SaveRun(run); err != nil { + t.Fatalf("failed to save run: %v", err) + } + } + + // List all runs + runs, err := db.ListRuns("", "", 0) + if err != nil { + t.Fatalf("failed to list runs: %v", err) + } + if len(runs) != 3 { + t.Errorf("expected 3 runs, got %d", len(runs)) + } + + // List by status + runs, err = db.ListRuns("", "running", 0) + if err != nil { + t.Fatalf("failed to list runs: %v", err) + } + if len(runs) != 2 { + t.Errorf("expected 2 running runs, got %d", len(runs)) + } + + // List with limit + runs, err = db.ListRuns("", "", 1) + if err != nil { + t.Fatalf("failed to list runs: %v", err) + } + if len(runs) != 1 { + t.Errorf("expected 1 run with limit, got %d", len(runs)) + } +} + +func TestDB_UpdateRunStatus(t *testing.T) { + tmpDir := t.TempDir() + db, err := Open(tmpDir) + if err != nil { + t.Fatalf("failed to open database: %v", err) + } + defer db.Close() + + // Save a workflow + workflow := &Workflow{ + ID: "workflow-1", + Name: "Workflow 1", + Version: "1.0.0", + Runtime: "node", + Code: "function workflow() {}", + Adapter: "exec", + } + if err := db.SaveWorkflow(workflow); err != nil { + t.Fatalf("failed to save workflow: %v", err) + } + + // Save a run + run := &WorkflowRun{ + ID: "run-to-update", + WorkflowID: "workflow-1", + Adapter: "exec", + Status: "running", + StartedAt: time.Now(), + } + if err := db.SaveRun(run); err != nil { + t.Fatalf("failed to save run: %v", err) + } + + // Update status + output := `{"result":"success"}` + if err := db.UpdateRunStatus("run-to-update", "completed", output, ""); err != nil { + t.Fatalf("failed to update run status: %v", err) + } + + // Verify update + retrieved, err := db.GetRun("run-to-update") + if err != nil { + t.Fatalf("failed to get run: %v", err) + } + + if retrieved.Status != "completed" { + t.Errorf("expected status 'completed', got '%s'", retrieved.Status) + } + if retrieved.Output != output { + t.Errorf("expected output '%s', got '%s'", output, retrieved.Output) + } + if retrieved.CompletedAt == nil { + t.Error("completed_at should be set") + } +} + +func TestDB_LinkTaskToWorkflow(t *testing.T) { + tmpDir := t.TempDir() + db, err := Open(tmpDir) + if err != nil { + t.Fatalf("failed to open database: %v", err) + } + defer db.Close() + + // Save a workflow + workflow := &Workflow{ + ID: "workflow-1", + Name: "Workflow 1", + Version: "1.0.0", + Runtime: "node", + Code: "function workflow() {}", + Adapter: "exec", + } + if err := db.SaveWorkflow(workflow); err != nil { + t.Fatalf("failed to save workflow: %v", err) + } + + // Link task to workflow + if err := db.LinkTaskToWorkflow(123, "workflow-1", "run-abc"); err != nil { + t.Fatalf("failed to link task: %v", err) + } + + // Get task workflow + workflowID, runID, err := db.GetTaskWorkflow(123) + if err != nil { + t.Fatalf("failed to get task workflow: %v", err) + } + + if workflowID != "workflow-1" { + t.Errorf("expected workflow ID 'workflow-1', got '%s'", workflowID) + } + if runID != "run-abc" { + t.Errorf("expected run ID 'run-abc', got '%s'", runID) + } +} + +func TestDB_GetRunByTaskID(t *testing.T) { + tmpDir := t.TempDir() + db, err := Open(tmpDir) + if err != nil { + t.Fatalf("failed to open database: %v", err) + } + defer db.Close() + + // Save a workflow + workflow := &Workflow{ + ID: "workflow-1", + Name: "Workflow 1", + Version: "1.0.0", + Runtime: "node", + Code: "function workflow() {}", + Adapter: "exec", + } + if err := db.SaveWorkflow(workflow); err != nil { + t.Fatalf("failed to save workflow: %v", err) + } + + // Save a run with task ID + run := &WorkflowRun{ + ID: "run-with-task", + WorkflowID: "workflow-1", + TaskID: 456, + Adapter: "exec", + Status: "running", + StartedAt: time.Now(), + } + if err := db.SaveRun(run); err != nil { + t.Fatalf("failed to save run: %v", err) + } + + // Get run by task ID + retrieved, err := db.GetRunByTaskID(456) + if err != nil { + t.Fatalf("failed to get run: %v", err) + } + + if retrieved == nil { + t.Fatal("retrieved run is nil") + } + if retrieved.ID != "run-with-task" { + t.Errorf("expected run ID 'run-with-task', got '%s'", retrieved.ID) + } +} + +func TestDB_GetPendingRuns(t *testing.T) { + tmpDir := t.TempDir() + db, err := Open(tmpDir) + if err != nil { + t.Fatalf("failed to open database: %v", err) + } + defer db.Close() + + // Save a workflow + workflow := &Workflow{ + ID: "workflow-1", + Name: "Workflow 1", + Version: "1.0.0", + Runtime: "node", + Code: "function workflow() {}", + Adapter: "exec", + } + if err := db.SaveWorkflow(workflow); err != nil { + t.Fatalf("failed to save workflow: %v", err) + } + + // Save runs with different statuses + statuses := []string{"running", "completed", "running", "failed"} + for i, status := range statuses { + run := &WorkflowRun{ + ID: "run-" + string(rune('a'+i)), + WorkflowID: "workflow-1", + Adapter: "exec", + Status: status, + StartedAt: time.Now(), + } + if err := db.SaveRun(run); err != nil { + t.Fatalf("failed to save run: %v", err) + } + } + + // Get pending (running) runs + pending, err := db.GetPendingRuns() + if err != nil { + t.Fatalf("failed to get pending runs: %v", err) + } + + if len(pending) != 2 { + t.Errorf("expected 2 pending runs, got %d", len(pending)) + } +}