package run import ( "bytes" "context" "errors" "fmt" "log/slog" "strings" "text/template" "gitea.stevedudenhoeffer.com/steve/majordomo/agent" "gitea.stevedudenhoeffer.com/steve/majordomo/llm" ) // The multi-step phase runner. A phased RunnableAgent (ra.Phases non-empty) runs // its phases in order; each phase is a fresh majordomo agent loop (or a single // bare LLM call for IsRunFunc phases) with its own template-expanded system // prompt, model tier, step cap, and tool subset. Phase outputs feed later phases // through {{.}} template variables; {{.Query}} is the original input. // The final phase's output is the run's output. // // Ported from mort's agentexec pipeline so the executus kernel — which already // carries RunnableAgent.Phases as a DTO — actually EXECUTES them (it previously // ignored the slice and ran a single loop with the base prompt). It reuses the // shared run machinery built once in Run: the same stepObserver (so audit/steps/ // critic accumulate across every phase), the same critic steer, and the same // compaction option. // // Semantics preserved from mort's pipeline: // - phases run sequentially; ctx cancellation between phases aborts the run. // - IsRunFunc = one bare LLM call, no tools, no loop. // - Optional phases swallow errors and substitute FallbackMessage. // - a non-optional phase that merely exhausts its step/tool budget is NOT fatal: // its partial transcript is salvaged and the pipeline continues (so a long // verify phase never discards every earlier phase's work). A hard error // (cancellation, model failure) still aborts. // - per-phase ModelTier resolve failures fall back to the base model with a WARN. // // Deliberately NOT carried over (kernel is leaner than mort's legacy pipeline): // the legacy `submit` capture tool (the kernel relies on majordomo's // no-tool-call-is-final-answer termination, like its single-loop path), and the // critic's dynamic iteration ceiling (per-phase caps are fixed at phase start — // the run-level critic's steer + hard deadline still apply across phases). // phaseDeps carries the per-run state the phase runner shares with Run: the base // model + tier, the full decorated toolbox (filtered per phase), the base step // cap, the shared agent options (tool-error limits + step observer + compactor), // the critic/session steer, and the audit recorder (phase events). type phaseDeps struct { baseModel llm.Model baseTier string baseToolbox *llm.Toolbox baseMaxIter int sharedOpts []agent.Option steer func() []llm.Message rec RunRecorder } // runPhases executes ra.Phases sequentially and returns a synthetic agent.Result // whose Output is the final phase's output, with Usage aggregated across phases // and Messages set to the last phase's transcript (for the PostRun hook). A hard // (non-optional, non-budget) phase failure returns the error. func (e *Executor) runPhases(runCtx context.Context, ra RunnableAgent, deps phaseDeps, query string, images []llm.ImagePart) (*agent.Result, error) { outputs := make(map[string]string, len(ra.Phases)) var lastResult *agent.Result var lastOutput string var totalUsage llm.Usage for i, phase := range ra.Phases { // A killed/timed-out/cancelled run must not start its next phase. if err := runCtx.Err(); err != nil { return lastResult, err } instructions := expandPhaseTemplate(phase.SystemPrompt, query, outputs) if deps.rec != nil { deps.rec.LogEvent("phase_start", map[string]any{"phase": phase.Name}) } output, res, err := e.runOnePhase(runCtx, ra, deps, phase, instructions, query, images) if res != nil { lastResult = res totalUsage = addUsage(totalUsage, res.Usage) } if err != nil { isLast := i == len(ra.Phases)-1 switch { case phase.Optional: output = phase.FallbackMessage if output == "" { output = fmt.Sprintf("(Phase %q encountered an error -- proceeding without its results)", phase.Name) } slog.Warn("run: optional pipeline phase failed", "agent", ra.Name, "phase", phase.Name, "error", err) if deps.rec != nil { deps.rec.LogEvent("phase_failed_optional", map[string]any{"phase": phase.Name, "error": err.Error()}) } case isPhaseBudgetExhaustion(err) && (!isLast || strings.TrimSpace(output) != ""): // Soft stop: the phase ran out of its step/tool budget before // composing a final answer. Not fatal — it did real work // (runOnePhase salvaged its partial transcript into output), and // aborting would discard every completed phase before it. Degrade // gracefully and continue. if strings.TrimSpace(output) == "" { output = fmt.Sprintf("(Phase %q reached its step budget before producing a consolidated result; continuing with its partial findings.)", phase.Name) } else { output += fmt.Sprintf("\n\n(Note: phase %q reached its step budget before fully completing; the above is its partial output.)", phase.Name) } slog.Warn("run: pipeline phase exhausted its budget; salvaging partial output and continuing", "agent", ra.Name, "phase", phase.Name, "last_phase", isLast, "error", err) if deps.rec != nil { deps.rec.LogEvent("phase_budget_exhausted", map[string]any{"phase": phase.Name, "error": err.Error(), "last_phase": isLast}) } default: return lastResult, fmt.Errorf("pipeline phase %q: %w", phase.Name, err) } } outputs[phase.Name] = output lastOutput = output } // Synthesize the run result: the final phase's output, usage aggregated over // all phases, and the last phase's transcript for the PostRun hook. if lastResult == nil { lastResult = &agent.Result{} } lastResult.Output = lastOutput lastResult.Usage = totalUsage return lastResult, nil } // runOnePhase runs a single phase: a bare LLM call for IsRunFunc phases, a fresh // agent loop otherwise. Returns the phase output, the loop result (nil for a // failed bare call), and any error. On a budget-exhaustion error the loop's // partial transcript is salvaged into the returned output. func (e *Executor) runOnePhase(runCtx context.Context, ra RunnableAgent, deps phaseDeps, phase Phase, instructions, query string, images []llm.ImagePart) (string, *agent.Result, error) { model := e.phaseModel(runCtx, deps, ra, phase) // The phase's expanded instructions are the system prompt (with the platform // header so tools keep their run ids); the original query is the user message. system := e.systemPromptWithBody(instructions) if phase.IsRunFunc { // Bare LLM call: no tool loop, no tools array (some models 400 on an empty // tools list). The response still lands in the audit token tally. msgs := []llm.Message{phaseUserMessage(query, images)} resp, err := model.Generate(runCtx, llm.Request{System: system, Messages: msgs}) if err != nil { return "", nil, fmt.Errorf("phase %q model call: %w", phase.Name, err) } if deps.rec != nil { deps.rec.OnStep(1, resp) } return resp.Text(), &agent.Result{ Output: resp.Text(), Usage: resp.Usage, Messages: append(msgs, resp.Message()), }, nil } toolbox := filterToolbox(deps.baseToolbox, phase.Tools) maxIter := phase.MaxIterations if maxIter <= 0 { maxIter = deps.baseMaxIter } // Per-phase opts: a fixed step ceiling for this phase (the critic's dynamic // ceiling is intentionally not propagated to phases) + the phase toolbox, on // top of the shared opts (tool-error limits, step observer, compactor). opts := append([]agent.Option{ agent.WithToolbox(toolbox), agent.WithMaxSteps(maxIter), }, deps.sharedOpts...) ag := agent.New(model, system, opts...) res, runErr := runAgent(runCtx, ag, query, images, agent.WithSteer(deps.steer)) output := "" if res != nil { output = res.Output } // Budget/guard exhaustion leaves a usable partial transcript but an empty // final answer; salvage the narrated work so the pipeline can carry it forward. if runErr != nil && isPhaseBudgetExhaustion(runErr) { if salvaged := salvagePhaseTranscript(res); salvaged != "" { output = salvaged } } return output, res, runErr } // phaseModel resolves the phase's model tier, falling back to the base model on // an empty tier or a resolution failure (WARN — visible, non-fatal). func (e *Executor) phaseModel(ctx context.Context, deps phaseDeps, ra RunnableAgent, phase Phase) llm.Model { if phase.ModelTier == "" { return deps.baseModel } _, m, err := e.cfg.Models(ctx, phase.ModelTier) if err != nil || m == nil { slog.Warn("run: pipeline phase model resolve failed; using base model", "agent", ra.Name, "phase", phase.Name, "tier", phase.ModelTier, "error", err) return deps.baseModel } return m } // isPhaseBudgetExhaustion reports whether err is a soft budget/guard stop (the // loop hit its step cap or tripped a tool-error guard) — which leaves a usable // partial transcript — as opposed to a hard error (cancellation, model failure). func isPhaseBudgetExhaustion(err error) bool { return errors.Is(err, agent.ErrMaxSteps) || errors.Is(err, agent.ErrToolLoop) } // salvagePhaseTranscript reconstructs a best-effort phase output from a loop that // ended without a final answer: the assistant's narrated text across every step, // tail-trimmed to a bound. Returns "" when the model wrote no prose. func salvagePhaseTranscript(res *agent.Result) string { if res == nil { return "" } var b strings.Builder for _, step := range res.Steps { if step.Response == nil { continue } if t := strings.TrimSpace(step.Response.Text()); t != "" { if b.Len() > 0 { b.WriteString("\n\n") } b.WriteString(t) } } out := strings.TrimSpace(b.String()) const maxSalvage = 8000 if len(out) > maxSalvage { out = "...(earlier reasoning trimmed)...\n" + out[len(out)-maxSalvage:] } return out } // phaseUserMessage builds a phase's user message: the original query text plus // any inline images. Mirrors the single-loop multimodal seeding in runAgent. func phaseUserMessage(query string, images []llm.ImagePart) llm.Message { if len(images) == 0 { return llm.UserText(query) } parts := make([]llm.Part, 0, len(images)+1) if strings.TrimSpace(query) != "" { parts = append(parts, llm.Text(query)) } for _, img := range images { parts = append(parts, img) } return llm.UserParts(parts...) } // expandPhaseTemplate applies Go text/template substitution to a phase prompt, // replacing {{.Query}} with the original query and {{.}} with a prior // phase's output. Returns the original string unchanged if parsing or execution // fails (best-effort, not fatal). func expandPhaseTemplate(tmpl, query string, priorOutputs map[string]string) string { t, err := template.New("phase").Option("missingkey=zero").Parse(tmpl) if err != nil { return tmpl } data := map[string]string{"Query": query} for k, v := range priorOutputs { data[k] = v } var buf bytes.Buffer if err := t.Execute(&buf, data); err != nil { return tmpl } return buf.String() } // filterToolbox returns a new toolbox restricted to the named tools (preserving // palette order). Empty names = the full palette. Unknown names are skipped with // a WARN — a typo'd phase tool list should not abort a run mid-pipeline. func filterToolbox(box *llm.Toolbox, names []string) *llm.Toolbox { out := llm.NewToolbox(box.Name()) if len(names) == 0 { for _, t := range box.Tools() { _ = out.Add(t) } return out } for _, name := range names { t, ok := box.Get(name) if !ok { slog.Warn("run: pipeline phase references unknown tool; skipping", "tool", name) continue } if err := out.Add(t); err != nil { slog.Warn("run: pipeline phase tool duplicated; skipping", "tool", name, "error", err) } } return out } // addUsage sums two llm.Usage tallies field-by-field so a phased run reports the // total tokens across all phases. func addUsage(a, b llm.Usage) llm.Usage { a.InputTokens += b.InputTokens a.OutputTokens += b.OutputTokens a.CacheReadTokens += b.CacheReadTokens a.CacheWriteTokens += b.CacheWriteTokens a.ReasoningTokens += b.ReasoningTokens return a }