Files
executus/run/executor.go
T
steve 0dd2ced717
executus CI / test (pull_request) Successful in 48s
fix(run): address gadfly review of the phases PR
Real findings from the consensus review (37 raw; many devstral dups/noise):

- Optional/budget-salvage branches no longer swallow a context
  cancellation / deadline / critic-kill: such errors return immediately so
  the run is classified cancelled/timeout/killed, not "ok" with a fallback.
  (the most serious finding — an Optional final phase could mask a killed run)
- IsRunFunc bare phase now feeds the SHARED step observer (not just the
  audit recorder), so the critic's activity clock + Result.Steps see it —
  a long synthesize phase no longer looks idle to the critic.
- phaseModel returns the resolver's enriched (usage-attribution) context and
  the phase's calls use it, mirroring the single-loop path (non-base-tier
  phases were mis-attributed).
- salvagePhaseTranscript trims the tail on a rune boundary (was a raw byte
  slice that could split a UTF-8 rune); maxSalvage is now a named const with
  rationale.
- expandPhaseTemplate logs a WARN on parse/execute failure instead of
  silently returning the unexpanded template; documented the phase-name
  identifier requirement + the "Query" shadow.
- removed the dead phaseDeps.baseTier field.
- extracted multimodalUserMessage, shared by runAgent + the phase runner
  (was duplicated image-folding).
- aggregated phase usage is stamped onto the result even on a hard-error
  return; TrimSpace computed once; filterToolbox returns the base toolbox
  as-is for the empty-names (full-palette) case instead of copying;
  phaseModel WARN no longer prints error=<nil>.

New test: Optional phase does not swallow a cancellation. Full ./... green.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-29 15:44:04 -04:00

500 lines
19 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
package run
import (
"context"
"errors"
"fmt"
"time"
"gitea.stevedudenhoeffer.com/steve/majordomo/agent"
"gitea.stevedudenhoeffer.com/steve/majordomo/llm"
"gitea.stevedudenhoeffer.com/steve/executus/compact"
"gitea.stevedudenhoeffer.com/steve/executus/deliver"
"gitea.stevedudenhoeffer.com/steve/executus/tool"
)
// ModelResolver resolves a tier alias or concrete spec to a usable llm.Model
// and an enriched context (for usage attribution). model.ParseModelForContext
// satisfies it.
type ModelResolver func(ctx context.Context, tier string) (context.Context, llm.Model, error)
// Defaults are the executor's fallback caps and loop guards, applied per run
// when the RunnableAgent leaves a field zero.
type Defaults struct {
MaxIterations int // tool-dispatch steps; default 12
MaxRuntime time.Duration // wall-clock per run; default 60s
FallbackTier string // tier when the agent's is empty; default "fast"
MaxConsecutiveToolErrors int // loop guard; default 3
MaxSameToolCallRepeats int // retry-storm guard; default 3
CompactionThresholdRatio float64 // fraction of model context to compact at; default 0.7
CriticSoftTimeout time.Duration // idle window before the critic wakes; default 90s
}
func (d Defaults) withFallbacks() Defaults {
if d.MaxIterations <= 0 {
d.MaxIterations = 12
}
if d.MaxRuntime <= 0 {
d.MaxRuntime = 60 * time.Second
}
if d.FallbackTier == "" {
d.FallbackTier = "fast"
}
if d.MaxConsecutiveToolErrors <= 0 {
d.MaxConsecutiveToolErrors = 3
}
if d.MaxSameToolCallRepeats <= 0 {
d.MaxSameToolCallRepeats = 3
}
if d.CompactionThresholdRatio <= 0 {
d.CompactionThresholdRatio = 0.7
}
if d.CriticSoftTimeout <= 0 {
d.CriticSoftTimeout = 90 * time.Second
}
return d
}
// Config wires an Executor. Registry + Models are required; everything else is
// optional and nil-safe — the zero Config beyond those yields a bounded,
// in-memory run with no persistence/audit/budget/critic/delegation/compaction
// (gadfly's case).
type Config struct {
Registry tool.Registry
Models ModelResolver
Defaults Defaults
Ports Ports
// Compactor mints the per-run context-compaction hook. nil disables
// compaction. ContextTokens resolves a tier's model context-window (for
// the compaction threshold); nil — or a zero return — also disables it.
Compactor compact.CompactorFactory
ContextTokens func(tier string) int
// SystemHeader is an optional platform header prepended to every agent's
// system prompt.
SystemHeader string
}
// Executor runs a RunnableAgent through majordomo's agent loop with the wired
// Ports. Construct with New; safe for concurrent use across runs.
type Executor struct {
cfg Config
}
// New builds an Executor. It panics if Registry or Models is nil — those are
// structural, not runtime, errors.
func New(cfg Config) *Executor {
if cfg.Registry == nil || cfg.Models == nil {
panic("run.New: Registry and Models are required")
}
cfg.Defaults = cfg.Defaults.withFallbacks()
return &Executor{cfg: cfg}
}
// Result is one run's outcome. Err carries the run failure (if any); the other
// fields are populated best-effort even on error (partial output/steps/usage).
type Result struct {
RunID string
Output string
Steps []tool.Step
Usage llm.Usage
Err error
// PostRunResult carries artifacts produced by a SessionToolFactory's PostRun
// hook (rendered images, files). nil when no factory was set or PostRun
// returned nil. The host delivers these (e.g. mort's chat API / Discord).
PostRunResult *tool.PostRunResult
}
// Run executes ra with the given invocation + input and returns the Result. It
// never propagates a panic; failures surface in Result.Err (a top-level recover
// converts any panic — including from a host Port — into a run error).
func (e *Executor) Run(ctx context.Context, ra RunnableAgent, inv tool.Invocation, input string) (res Result) {
started := time.Now()
res = Result{RunID: inv.RunID}
// Enforce the no-panic contract: a panic anywhere in the run (incl. a host
// Critic/Audit/Palette callback on the main goroutine) becomes Result.Err
// rather than unwinding into the caller.
defer func() {
if r := recover(); r != nil {
res.Err = fmt.Errorf("run.Executor: recovered panic: %v", r)
}
}()
tier := ra.ModelTier
if tier == "" {
tier = e.cfg.Defaults.FallbackTier
}
maxIter := ra.MaxIterations
if maxIter <= 0 {
maxIter = e.cfg.Defaults.MaxIterations
}
maxRuntime := ra.MaxRuntime
if maxRuntime <= 0 {
maxRuntime = e.cfg.Defaults.MaxRuntime
}
// Budget gate (pre-run): a rejected run makes no model call.
if e.cfg.Ports.Budget != nil {
if err := e.cfg.Ports.Budget.Check(ctx, inv.CallerID); err != nil {
res.Err = err
return res
}
}
// Resolve the model (enriches ctx for usage attribution).
modelCtx, model, err := e.cfg.Models(ctx, tier)
if err != nil {
res.Err = fmt.Errorf("resolve model %q: %w", tier, err)
return res
}
if model == nil {
// A resolver returning (ctx, nil, nil) would otherwise nil-panic inside
// the agent loop; surface it as a clean error (Run never panics out).
res.Err = fmt.Errorf("resolve model %q: resolver returned a nil model", tier)
return res
}
ctx = modelCtx
// Audit start (optional). The recorder satisfies RunTally; stamp it on the
// invocation so a self-status tool can read live progress.
info := RunInfo{
RunID: inv.RunID,
SubjectID: ra.ID,
Name: ra.Name,
CallerID: inv.CallerID,
ChannelID: inv.ChannelID,
ParentRunID: inv.ParentRunID,
Inputs: inv.SkillInputs,
StartedAt: started,
MaxIterations: maxIter,
}
var rec RunRecorder
var stateAcc *RunStateAccessor
if e.cfg.Ports.Audit != nil {
rec = e.cfg.Ports.Audit.StartRun(ctx, info)
}
if rec != nil {
stateAcc = NewRunStateAccessor(rec, maxIter, 0, started)
inv.RunState = stateAcc
}
// Steer mailbox: lets session tools (via inv.AttachImages) feed multimodal
// messages into the running conversation before its next step. Created BEFORE
// the toolbox build so any tool's handler captures the live AttachImages seam.
mailbox := &steerMailbox{}
inv.AttachImages = (&runSession{mailbox: mailbox}).AttachImages
// Build the toolbox from the agent's low-level tools.
toolbox, err := e.cfg.Registry.Build(ra.LowLevelTools, inv, tool.Visibility("private"), nil)
if err != nil {
res.Err = fmt.Errorf("build toolbox: %w", err)
e.finishAudit(ctx, rec, "error", res, started, res.Err)
return res
}
// Add skill__/agent__ delegation tools from the agent's palette (nil-safe:
// no PaletteSource or empty palette → no delegation tools).
if err := addDelegationTools(toolbox, ra, inv, e.cfg.Ports.Palette); err != nil {
res.Err = fmt.Errorf("build delegation tools: %w", err)
e.finishAudit(ctx, rec, "error", res, started, res.Err)
return res
}
// Per-invocation ExtraTools + a SessionToolFactory's per-run tools, added on
// top of the agent's palette. The factory closes over the live session (the
// AttachImages mailbox); its PostRun hook (held for after the run) produces
// artifacts attached to res.PostRunResult, and its Cleanup is deferred. All
// nil-safe.
for _, t := range inv.ExtraTools {
if err := toolbox.Add(t); err != nil {
res.Err = fmt.Errorf("add extra tool: %w", err)
e.finishAudit(ctx, rec, "error", res, started, res.Err)
return res
}
}
var postRun func(ctx context.Context, transcript []llm.Message, output string, runErr error) *tool.PostRunResult
if inv.SessionToolFactory != nil {
st := inv.SessionToolFactory(&runSession{mailbox: mailbox})
if st.Cleanup != nil {
defer safeCleanup(st.Cleanup) // panic-isolated, like runPostRun
}
for _, t := range st.Tools {
if err := toolbox.Add(t); err != nil {
res.Err = fmt.Errorf("add session tool: %w", err)
e.finishAudit(ctx, rec, "error", res, started, res.Err)
return res
}
}
postRun = st.PostRun
}
// Run context: bound by MaxRuntime, detached from the caller's deadline so a
// lane/queue wait doesn't eat the run budget (mort's V10 lesson). Caller
// cancellation still propagates via MergeCancellation. Created BEFORE the
// step observer so the observer forwards the merged run context (not a
// possibly-cancelled caller ctx) to OnStep consumers.
// MaxRuntime stays a WithTimeout so its DeadlineExceeded propagates through the
// child chain (→ "timeout"), preserving the run's-own-timeout vs caller-cancel
// distinction. A NESTED cause-carrying layer lets a critic kill surface as a
// distinct "killed" without disturbing that: only an ErrCriticKill cause is
// consulted in statusFor; a generic run error or a caller cancel is classified
// by the run error itself.
timeoutCtx, cancelTimeout := context.WithTimeout(context.WithoutCancel(ctx), maxRuntime)
defer cancelTimeout()
runCtx, cancelCause := context.WithCancelCause(timeoutCtx)
defer cancelCause(nil)
runCtx, mergeCancel := MergeCancellation(runCtx, ctx)
defer mergeCancel()
// Critic (optional): monitors the run for a stall, can nudge/extend/kill via
// its host Escalator. Its hard deadline is bound to runCtx (cancel on pass).
// nil-safe: no-op when no critic is configured or the agent doesn't enable it.
critic, stopCritic := e.startCritic(runCtx, cancelCause, ra, info)
defer stopCritic()
// Step instrumentation: accumulate Result.Steps + fire inv.OnStep, feed the
// audit recorder, and keep the live iteration counter fresh. majordomo's
// step observer hands us each completed iteration; we zip the model's tool
// calls with their executed results PAIRWISE — a result without a matching
// call (or a call without a result) is skipped rather than recorded as an
// empty-name "ghost" step.
emitter := newStepEmitter(inv.OnStep)
stepObserver := func(s agent.Step) {
if stateAcc != nil {
stateAcc.SetIteration(s.Index)
}
if rec != nil {
rec.OnStep(s.Index, s.Response)
}
critic.recordStep(s.Index, s.Response) // keep the critic's activity clock fresh + carry the step payload
var calls []llm.ToolCall
if s.Response != nil {
calls = s.Response.ToolCalls
}
n := len(s.Results)
if len(calls) < n {
n = len(calls)
}
for i := 0; i < n; i++ {
call, r := calls[i], s.Results[i]
critic.recordToolStart(call.Name, string(call.Arguments))
emitter.toolStart(runCtx, call.Name, call.Arguments)
emitter.toolEnd(runCtx, call, r.Content, r.IsError)
if rec != nil {
rec.OnTool(call, r.Content)
}
}
}
// Shared agent options used by BOTH the single-loop path and every phase: the
// tool-error guards, the step observer, and optional compaction. The toolbox +
// step ceiling are NOT shared (they vary per phase), so they're added per path.
sharedOpts := []agent.Option{
agent.WithToolErrorLimits(e.cfg.Defaults.MaxConsecutiveToolErrors, e.cfg.Defaults.MaxSameToolCallRepeats),
agent.WithStepObserver(stepObserver),
}
if e.cfg.Compactor != nil && e.cfg.ContextTokens != nil {
if threshold := e.compactionThreshold(tier); threshold > 0 {
// Forward compaction events to the audit log (makes the
// CompactionEvent doc's "logged to the run trace" promise true).
var onFire func(compact.CompactionEvent)
if rec != nil {
onFire = func(ev compact.CompactionEvent) {
rec.LogEvent("compaction_fired", map[string]any{
"messages_before": ev.MessagesBefore,
"messages_after": ev.MessagesAfter,
"tokens_before": ev.TokensBefore,
"tokens_after": ev.TokensAfter,
})
}
}
sharedOpts = append(sharedOpts, agent.WithCompactor(e.cfg.Compactor(threshold, onFire)))
}
}
// Stage non-image input attachments (audio/PDF/binary) into the host file
// store and fold an [ATTACHED FILES] descriptor into the prompt so the agent
// can reach them by file_id. No-op when Ports.InputFiles is nil or there are
// no files. Done after the model/toolbox build but before the loop, so the
// descriptor rides the very first user turn.
input = e.stageInputFiles(runCtx, inv.RunID, ra.ID, inv.InputFiles, input)
// One WithSteer drains BOTH the session mailbox (a tool's AttachImages) and
// the critic's nudges before each step.
steer := func() []llm.Message { return append(mailbox.drain(), critic.drainSteer()...) }
var runRes *agent.Result
var runErr error
if len(ra.Phases) == 0 {
// Single-loop run: the agent's base prompt + full toolbox, with the
// critic's DYNAMIC step ceiling (WithMaxStepsFunc, so it can raise a
// healthy-but-long run's budget mid-flight; falls back to maxIter).
opts := append([]agent.Option{
agent.WithToolbox(toolbox),
critic.maxStepsOption(maxIter),
}, sharedOpts...)
ag := agent.New(model, e.systemPrompt(ra), opts...)
runRes, runErr = runAgent(runCtx, ag, input, inv.Images, agent.WithSteer(steer))
} else {
// Multi-phase pipeline: each phase runs its own prompt/tier/tools/step-cap
// sequentially, threading outputs through {{.<PhaseName>}} templates. Reuses
// the shared opts so audit/steps/critic-steer accumulate across every phase.
// (Per-phase step caps are fixed — the critic's dynamic ceiling is not
// propagated to phases — but its steer + hard deadline still apply.)
runRes, runErr = e.runPhases(runCtx, ra, phaseDeps{
baseModel: model,
baseToolbox: toolbox,
baseMaxIter: maxIter,
sharedOpts: sharedOpts,
stepObserver: stepObserver,
steer: steer,
rec: rec,
}, input, inv.Images)
}
status := statusFor(runCtx, runErr)
if runRes != nil {
res.Output = runRes.Output
res.Usage = runRes.Usage
}
res.Steps = emitter.snapshot()
res.Err = runErr
// PostRun: hand the SessionToolFactory's hook the full transcript (populated
// even on partial results) so it can produce artifacts. Best-effort +
// panic-isolated — a PostRun failure never fails an otherwise-successful run.
if postRun != nil {
var transcript []llm.Message
if runRes != nil {
transcript = runRes.Messages
}
// Detach from the caller's ctx: a finished/cancelled caller must not abort
// artifact production (the hook owns its own bounding, per its contract).
res.PostRunResult = runPostRun(detach(ctx), postRun, transcript, res.Output, runErr)
}
e.finishAudit(ctx, rec, status, res, started, runErr)
if e.cfg.Ports.Budget != nil {
e.cfg.Ports.Budget.Commit(detach(ctx), inv.CallerID, time.Since(started).Seconds())
}
e.deliver(ctx, inv, res, runErr)
return res
}
// statusFor maps a run error to a RunStats.Status, distinguishing a critic kill
// (killed), a deadline (timeout), and a cancellation (cancelled — caller cancel
// or shutdown) from a generic error so audit consumers can tell them apart. The
// run context's cancellation cause carries the distinction (ErrCriticKill /
// DeadlineExceeded), since ctx.Err() alone only reports Canceled.
func statusFor(runCtx context.Context, runErr error) string {
switch {
case runErr == nil:
return "ok"
// Only the kill is recovered from the cancellation cause — a critic kill
// surfaces as a plain Canceled run error, so without this it'd read as
// "cancelled". Everything else is classified by the run error itself, so a
// genuine run error is never relabeled just because the context was later
// cancelled, and a caller cancel/deadline stays "cancelled" (not "timeout").
case errors.Is(context.Cause(runCtx), ErrCriticKill):
return "killed"
case errors.Is(runErr, context.DeadlineExceeded):
return "timeout"
case errors.Is(runErr, context.Canceled):
return "cancelled"
default:
return "error"
}
}
// finishAudit writes the terminal roll-up on a detached context so a cancelled
// run still records (mort's CleanupContextTimeout lesson).
func (e *Executor) finishAudit(ctx context.Context, rec RunRecorder, status string, res Result, started time.Time, runErr error) {
if rec == nil {
return
}
stats := RunStats{
Status: status,
Output: res.Output,
ToolCalls: rec.ToolCallsCount(),
RuntimeSeconds: time.Since(started).Seconds(),
}
if runErr != nil {
stats.Error = runErr.Error()
}
stats.InputTokens, stats.OutputTokens, stats.ThinkingTokens = rec.TokenStats()
rec.Close(detach(ctx), stats)
}
func (e *Executor) systemPrompt(ra RunnableAgent) string {
return e.systemPromptWithBody(ra.SystemPrompt)
}
// systemPromptWithBody composes the optional platform header with an arbitrary
// body. The single-loop path passes ra.SystemPrompt; the phase runner passes a
// phase's expanded instructions, so each phase keeps the platform header.
func (e *Executor) systemPromptWithBody(body string) string {
if e.cfg.SystemHeader == "" {
return body
}
if body == "" {
return e.cfg.SystemHeader
}
return e.cfg.SystemHeader + "\n\n" + body
}
// compactionThreshold returns the token threshold for the tier's model context
// window (ratio × limit), or 0 when the limit is unknown.
func (e *Executor) compactionThreshold(tier string) int {
max := e.cfg.ContextTokens(tier)
if max <= 0 {
return 0
}
return int(float64(max) * e.cfg.Defaults.CompactionThresholdRatio)
}
// deliver posts the run's output (or error) via run.Ports.Delivery when both a
// Delivery and a target (inv.DeliveryID) are set. No target = the caller reads
// Result.Output itself (the synchronous default). Best-effort + detached: a
// delivery failure must not change the run's outcome.
func (e *Executor) deliver(ctx context.Context, inv tool.Invocation, res Result, runErr error) {
if e.cfg.Ports.Delivery == nil || inv.DeliveryID == "" {
return
}
target := deliver.Target{Kind: inv.DeliveryKind, ID: inv.DeliveryID}
dctx := detach(ctx)
if runErr != nil {
_ = e.cfg.Ports.Delivery.DeliverError(dctx, target, runErr)
return
}
_, _ = e.cfg.Ports.Delivery.Deliver(dctx, target, res.Output, nil)
}
// detach derives a bounded cleanup context off ctx, detached from its
// cancellation, for post-run writes. The cancel is intentionally not returned;
// CleanupContextTimeout bounds the lifetime.
func detach(ctx context.Context) context.Context {
c, cancel := context.WithTimeout(context.WithoutCancel(ctx), CleanupContextTimeout)
_ = cancel // bounded by the timeout; nothing to cancel early
return c
}
// runAgent dispatches the majordomo agent loop. majordomo's Run takes a text-only
// input arg, so when the invocation carries images they're folded into the first
// user message (text + image parts) via WithHistory and Run is called with an
// empty input — the model then sees a multimodal opening turn. The image-less path
// passes the prompt straight through.
//
// The text part is omitted when input is blank (image-only run), matching
// runSession.AttachImages so no empty TextPart is sent.
func runAgent(ctx context.Context, ag *agent.Agent, input string, images []llm.ImagePart, opts ...agent.RunOption) (*agent.Result, error) {
if len(images) == 0 {
return ag.Run(ctx, input, opts...)
}
// Copy opts before appending so a caller-supplied backing array is never
// mutated/aliased (the variadic slice can have spare capacity). The multimodal
// opening turn (text + image parts) is built by the shared helper.
opts = append(opts[:len(opts):len(opts)], agent.WithHistory([]llm.Message{multimodalUserMessage(input, images)}))
return ag.Run(ctx, "", opts...)
}