package run import ( "context" "errors" "fmt" "log/slog" "time" "gitea.stevedudenhoeffer.com/steve/majordomo/agent" "gitea.stevedudenhoeffer.com/steve/majordomo/llm" "gitea.stevedudenhoeffer.com/steve/executus/compact" "gitea.stevedudenhoeffer.com/steve/executus/deliver" "gitea.stevedudenhoeffer.com/steve/executus/tool" ) // ModelResolver resolves a tier alias or concrete spec to a usable llm.Model // and an enriched context (for usage attribution). model.ParseModelForContext // satisfies it. type ModelResolver func(ctx context.Context, tier string) (context.Context, llm.Model, error) // Defaults are the executor's fallback caps and loop guards, applied per run // when the RunnableAgent leaves a field zero. type Defaults struct { MaxIterations int // tool-dispatch steps; default 12 MaxRuntime time.Duration // wall-clock per run; default 60s FallbackTier string // tier when the agent's is empty; default "fast" MaxConsecutiveToolErrors int // loop guard; default 3 MaxSameToolCallRepeats int // retry-storm guard; default 3 CompactionThresholdRatio float64 // fraction of model context to compact at; default 0.7 CriticSoftTimeout time.Duration // idle window before the critic wakes; default 90s // CriticAbsoluteMax is the failsafe wall-clock ceiling for a critic-OWNED run // (Ports.Critic set AND the agent enables it). For such a run MaxRuntime is the // SOFT trigger, not a hard cap, and the critic's extendable backstop is the // normal deadline — so this ceiling only fires if the critic never acts (a // broken/nil host handle). Default 6h; never shorter than the run's MaxRuntime. // Non-critic runs ignore it (they keep the literal MaxRuntime kill). CriticAbsoluteMax time.Duration } func (d Defaults) withFallbacks() Defaults { if d.MaxIterations <= 0 { d.MaxIterations = 12 } if d.MaxRuntime <= 0 { d.MaxRuntime = 60 * time.Second } if d.FallbackTier == "" { d.FallbackTier = "fast" } if d.MaxConsecutiveToolErrors <= 0 { d.MaxConsecutiveToolErrors = 3 } if d.MaxSameToolCallRepeats <= 0 { d.MaxSameToolCallRepeats = 3 } if d.CompactionThresholdRatio <= 0 { d.CompactionThresholdRatio = 0.7 } if d.CriticSoftTimeout <= 0 { d.CriticSoftTimeout = 90 * time.Second } if d.CriticAbsoluteMax <= 0 { d.CriticAbsoluteMax = 6 * time.Hour } return d } // Config wires an Executor. Registry + Models are required; everything else is // optional and nil-safe — the zero Config beyond those yields a bounded, // in-memory run with no persistence/audit/budget/critic/delegation/compaction // (gadfly's case). type Config struct { Registry tool.Registry Models ModelResolver Defaults Defaults Ports Ports // Compactor mints the per-run context-compaction hook. nil disables // compaction. ContextTokens resolves a tier's model context-window (for // the compaction threshold); nil — or a zero return — also disables it. Compactor compact.CompactorFactory ContextTokens func(tier string) int // SystemHeader is an optional platform header prepended to every agent's // system prompt. SystemHeader string } // Executor runs a RunnableAgent through majordomo's agent loop with the wired // Ports. Construct with New; safe for concurrent use across runs. type Executor struct { cfg Config } // New builds an Executor. It panics if Registry or Models is nil — those are // structural, not runtime, errors. func New(cfg Config) *Executor { if cfg.Registry == nil || cfg.Models == nil { panic("run.New: Registry and Models are required") } cfg.Defaults = cfg.Defaults.withFallbacks() return &Executor{cfg: cfg} } // Result is one run's outcome. Err carries the run failure (if any); the other // fields are populated best-effort even on error (partial output/steps/usage). type Result struct { RunID string Output string Steps []tool.Step Usage llm.Usage Err error // PostRunResult carries artifacts produced by a SessionToolFactory's PostRun // hook (rendered images, files). nil when no factory was set or PostRun // returned nil. The host delivers these (e.g. mort's chat API / Discord). PostRunResult *tool.PostRunResult } // Run executes ra with the given invocation + input and returns the Result. It // never propagates a panic; failures surface in Result.Err (a top-level recover // converts any panic — including from a host Port — into a run error). func (e *Executor) Run(ctx context.Context, ra RunnableAgent, inv tool.Invocation, input string) (res Result) { started := time.Now() res = Result{RunID: inv.RunID} // ckpt is the per-run durable checkpointer (resolved below; nil = non-durable). // checkpointCause yields the run context's cancellation cause once the run // context exists; nil before then (an early build-error return). var ckpt Checkpointer var checkpointCause func() error // Enforce the no-panic contract: a panic anywhere in the run (incl. a host // Critic/Audit/Palette callback on the main goroutine) becomes Result.Err // rather than unwinding into the caller. This defer ALSO finalizes the // checkpoint on EVERY exit path — panic, an early build-error return (before // the run loop), or normal completion — so a recovered run's durable record is // never left dangling (which would loop boot-recovery on a persistent error). defer func() { if r := recover(); r != nil { res.Err = fmt.Errorf("run.Executor: recovered panic: %v", r) } var cause error if checkpointCause != nil { cause = checkpointCause() } finalizeCheckpoint(ctx, ckpt, res.Err, cause) }() tier := ra.ModelTier if tier == "" { tier = e.cfg.Defaults.FallbackTier } maxIter := ra.MaxIterations if maxIter <= 0 { maxIter = e.cfg.Defaults.MaxIterations } maxRuntime := ra.MaxRuntime if maxRuntime <= 0 { maxRuntime = e.cfg.Defaults.MaxRuntime } // Budget gate (pre-run): a rejected run makes no model call. if e.cfg.Ports.Budget != nil { if err := e.cfg.Ports.Budget.Check(ctx, inv.CallerID); err != nil { res.Err = err return res } } // Resolve the model (enriches ctx for usage attribution). modelCtx, model, err := e.cfg.Models(ctx, tier) if err != nil { res.Err = fmt.Errorf("resolve model %q: %w", tier, err) return res } if model == nil { // A resolver returning (ctx, nil, nil) would otherwise nil-panic inside // the agent loop; surface it as a clean error (Run never panics out). res.Err = fmt.Errorf("resolve model %q: resolver returned a nil model", tier) return res } ctx = modelCtx // Audit start (optional). The recorder satisfies RunTally; stamp it on the // invocation so a self-status tool can read live progress. info := RunInfo{ RunID: inv.RunID, SubjectID: ra.ID, Name: ra.Name, CallerID: inv.CallerID, ChannelID: inv.ChannelID, GuildID: inv.GuildID, ParentRunID: inv.ParentRunID, ModelTier: tier, Inputs: inv.SkillInputs, StartedAt: started, MaxIterations: maxIter, } var rec RunRecorder var stateAcc *RunStateAccessor if e.cfg.Ports.Audit != nil { rec = e.cfg.Ports.Audit.StartRun(ctx, info) } if rec != nil { stateAcc = NewRunStateAccessor(rec, maxIter, 0, started) inv.RunState = stateAcc } // Durable recovery (optional): a recovered run carries a ResumeState (prior // transcript / completed phases) + an existing Checkpointer in ctx so it // continues on the SAME durable record; a fresh run mints a per-run // Checkpointer via the factory (which decides durability — nil = non-durable). // nil-safe throughout. resume := resumeStateFromContext(ctx) ckpt = existingCheckpointerFromContext(ctx) if ckpt == nil && e.cfg.Ports.Checkpointer != nil { c, cerr := e.cfg.Ports.Checkpointer.Begin(ctx, info) if cerr != nil { // Degrade to non-durable (the documented contract) but log it — a // failing checkpoint store must not fail the run, yet shouldn't be silent. slog.Warn("run: checkpointer Begin failed; running non-durable", "run_id", inv.RunID, "error", cerr) } else { ckpt = c } } // Steer mailbox: lets session tools (via inv.AttachImages) feed multimodal // messages into the running conversation before its next step. Created BEFORE // the toolbox build so any tool's handler captures the live AttachImages seam. mailbox := &steerMailbox{} inv.AttachImages = (&runSession{mailbox: mailbox}).AttachImages // Build the toolbox from the agent's low-level tools. toolbox, err := e.cfg.Registry.Build(ra.LowLevelTools, inv, tool.Visibility("private"), nil) if err != nil { res.Err = fmt.Errorf("build toolbox: %w", err) e.finishAudit(ctx, rec, "error", res, started, res.Err) return res } // Add skill__/agent__ delegation tools from the agent's palette (nil-safe: // no PaletteSource or empty palette → no delegation tools). if err := addDelegationTools(toolbox, ra, inv, e.cfg.Ports.Palette); err != nil { res.Err = fmt.Errorf("build delegation tools: %w", err) e.finishAudit(ctx, rec, "error", res, started, res.Err) return res } // Per-invocation ExtraTools + a SessionToolFactory's per-run tools, added on // top of the agent's palette. The factory closes over the live session (the // AttachImages mailbox); its PostRun hook (held for after the run) produces // artifacts attached to res.PostRunResult, and its Cleanup is deferred. All // nil-safe. for _, t := range inv.ExtraTools { if err := toolbox.Add(t); err != nil { res.Err = fmt.Errorf("add extra tool: %w", err) e.finishAudit(ctx, rec, "error", res, started, res.Err) return res } } var postRun func(ctx context.Context, transcript []llm.Message, output string, runErr error) *tool.PostRunResult if inv.SessionToolFactory != nil { st := inv.SessionToolFactory(&runSession{mailbox: mailbox}) if st.Cleanup != nil { defer safeCleanup(st.Cleanup) // panic-isolated, like runPostRun } for _, t := range st.Tools { if err := toolbox.Add(t); err != nil { res.Err = fmt.Errorf("add session tool: %w", err) e.finishAudit(ctx, rec, "error", res, started, res.Err) return res } } postRun = st.PostRun } // Run context: detached from the caller's deadline so a lane/queue wait doesn't // eat the run budget (mort's V10 lesson). Caller cancellation still propagates // via MergeCancellation. Created BEFORE the step observer so the observer // forwards the merged run context (not a possibly-cancelled caller ctx) to // OnStep consumers. // // Two-tier timeout: who owns the hard deadline depends on the critic. // - NO critic (the default): MaxRuntime is a literal WithTimeout. Its // DeadlineExceeded propagates through the child chain (→ "timeout"), // preserving the run's-own-timeout vs caller-cancel distinction. // - critic OWNS the deadline (Ports.Critic set + ra.Critic.Enabled): // MaxRuntime becomes the SOFT trigger (passed to startCritic), and the // critic's extendable backstop — watched in startCritic, which cancels via // cancelCause — is the real deadline. A slow-but-progressing run is given // room up to the backstop; only a stalled one is killed. We still wrap a // GENEROUS WithTimeout at CriticAbsoluteMax so a broken/nil critic handle // can't run unbounded; that ceiling never fires before the critic's backstop. // A NESTED cause-carrying layer (cancelCause) lets a critic kill surface as a // distinct "killed": only an ErrCriticKill cause is consulted in statusFor; a // generic run error, a backstop expiry, or a caller cancel is classified by the // run error itself. criticOwnsDeadline := e.cfg.Ports.Critic != nil && ra.Critic.Enabled hardCap := maxRuntime if criticOwnsDeadline { hardCap = e.cfg.Defaults.CriticAbsoluteMax if hardCap < maxRuntime { hardCap = maxRuntime // the failsafe ceiling is never shorter than the nominal budget } } timeoutCtx, cancelTimeout := context.WithTimeout(context.WithoutCancel(ctx), hardCap) defer cancelTimeout() runCtx, cancelCause := context.WithCancelCause(timeoutCtx) defer cancelCause(nil) runCtx, mergeCancel := MergeCancellation(runCtx, ctx) defer mergeCancel() // The finalize defer (top of Run) now has a run context to read the // cancellation cause from (shutdown vs critic-kill vs deadline vs cancel). checkpointCause = func() error { return context.Cause(runCtx) } // Critic (optional): monitors the run for a stall, can nudge/extend/kill via // its host Escalator. When it owns the deadline, MaxRuntime is its soft trigger // (so a slow-but-progressing run survives past it); its extendable backstop is // bound to runCtx (cancel on pass). nil-safe: no-op when no critic is configured // or the agent doesn't enable it. critic, stopCritic := e.startCritic(runCtx, cancelCause, ra, info, maxRuntime) defer stopCritic() // Step instrumentation: accumulate Result.Steps + fire inv.OnStep, feed the // audit recorder, and keep the live iteration counter fresh. majordomo's // step observer hands us each completed iteration; we zip the model's tool // calls with their executed results PAIRWISE — a result without a matching // call (or a call without a result) is skipped rather than recorded as an // empty-name "ghost" step. emitter := newStepEmitter(inv.OnStep) stepObserver := func(s agent.Step) { if stateAcc != nil { stateAcc.SetIteration(s.Index) } if rec != nil { rec.OnStep(s.Index, s.Response) } critic.recordStep(s.Index, s.Response) // keep the critic's activity clock fresh + carry the step payload var calls []llm.ToolCall if s.Response != nil { calls = s.Response.ToolCalls } n := len(s.Results) if len(calls) < n { n = len(calls) } for i := 0; i < n; i++ { call, r := calls[i], s.Results[i] critic.recordToolStart(call.Name, string(call.Arguments)) emitter.toolStart(runCtx, call.Name, call.Arguments) emitter.toolEnd(runCtx, call, r.Content, r.IsError) if rec != nil { rec.OnTool(call, r.Content) } } } // Shared agent options used by BOTH the single-loop path and every phase: the // tool-error guards and optional compaction. The toolbox, step ceiling, AND // step observer are added per path (the observer is wrapped for checkpointing, // which differs single-loop vs per-phase). sharedOpts := []agent.Option{ agent.WithToolErrorLimits(e.cfg.Defaults.MaxConsecutiveToolErrors, e.cfg.Defaults.MaxSameToolCallRepeats), } if e.cfg.Compactor != nil && e.cfg.ContextTokens != nil { if threshold := e.compactionThreshold(tier); threshold > 0 { // Forward compaction events to the audit log (makes the // CompactionEvent doc's "logged to the run trace" promise true). var onFire func(compact.CompactionEvent) if rec != nil { onFire = func(ev compact.CompactionEvent) { rec.LogEvent("compaction_fired", map[string]any{ "messages_before": ev.MessagesBefore, "messages_after": ev.MessagesAfter, "tokens_before": ev.TokensBefore, "tokens_after": ev.TokensAfter, }) } } sharedOpts = append(sharedOpts, agent.WithCompactor(e.cfg.Compactor(threshold, onFire))) } } // Stage non-image input attachments (audio/PDF/binary) into the host file // store and fold an [ATTACHED FILES] descriptor into the prompt so the agent // can reach them by file_id. No-op when Ports.InputFiles is nil or there are // no files. Done after the model/toolbox build but before the loop, so the // descriptor rides the very first user turn. input = e.stageInputFiles(runCtx, inv.RunID, ra.ID, inv.InputFiles, input) // One WithSteer drains BOTH the session mailbox (a tool's AttachImages) and // the critic's nudges before each step. steer := func() []llm.Message { return append(mailbox.drain(), critic.drainSteer()...) } resuming := resume != nil && len(resume.History) > 0 var runRes *agent.Result var runErr error if len(ra.Phases) == 0 { // Single-loop run: the agent's base prompt + full toolbox, with the // critic's DYNAMIC step ceiling (WithMaxStepsFunc, so it can raise a // healthy-but-long run's budget mid-flight; falls back to maxIter). // // Checkpointing: wrap the step observer to accumulate the running transcript // and Save it each step. Save is called every step; THROTTLING is the // Checkpointer's responsibility (the battery + mort's durable-job adapter // both throttle + size-cap), so the kernel doesn't gate the hot path. The // accumulated transcript is the pre-compaction one (the observer sees raw // step responses, not the loop's compacted history) — a host that caps size // bounds it. A recovered run seeds the saved transcript and continues. obs := stepObserver if ckpt != nil { var acc []llm.Message if resuming { acc = append([]llm.Message(nil), resume.History...) } else { acc = []llm.Message{multimodalUserMessage(input, inv.Images)} } obs = func(s agent.Step) { stepObserver(s) if s.Response != nil { acc = append(acc, s.Response.Message()) } if len(s.Results) > 0 { acc = append(acc, llm.ToolResultsMessage(s.Results...)) } _ = ckpt.Save(runCtx, RunCheckpointState{Messages: acc, Iteration: s.Index + 1}) } } opts := append([]agent.Option{ agent.WithToolbox(toolbox), critic.maxStepsOption(maxIter), agent.WithStepObserver(obs), }, sharedOpts...) ag := agent.New(model, e.systemPrompt(ra), opts...) if resuming { // Resume: seed the saved transcript and continue (no new input — the // completed tool calls in the transcript are NOT re-run). runRes, runErr = ag.Run(runCtx, "", agent.WithSteer(steer), agent.WithHistory(resume.History)) } else { runRes, runErr = runAgent(runCtx, ag, input, inv.Images, agent.WithSteer(steer)) } } else { // Multi-phase pipeline: each phase runs its own prompt/tier/tools/step-cap // sequentially, threading outputs through {{.}} templates. The // shared step observer (audit/steps/critic) is wired per phase by the phase // runner; checkpointing is phase-boundary granular (completed phases are // recorded so a resumed run skips them). runRes, runErr = e.runPhases(runCtx, ra, phaseDeps{ baseModel: model, baseToolbox: toolbox, baseMaxIter: maxIter, sharedOpts: sharedOpts, stepObserver: stepObserver, steer: steer, rec: rec, checkpointer: ckpt, resume: resume, }, input, inv.Images) } // Durable-recovery finalize (Complete/Fail/leave-running) happens in the // top-of-Run defer so it covers panics + early build-error returns too. status := statusFor(runCtx, runErr) if runRes != nil { res.Output = runRes.Output res.Usage = runRes.Usage } res.Steps = emitter.snapshot() res.Err = runErr // PostRun: hand the SessionToolFactory's hook the full transcript (populated // even on partial results) so it can produce artifacts. Best-effort + // panic-isolated — a PostRun failure never fails an otherwise-successful run. if postRun != nil { var transcript []llm.Message if runRes != nil { transcript = runRes.Messages } // Detach from the caller's ctx: a finished/cancelled caller must not abort // artifact production (the hook owns its own bounding, per its contract). res.PostRunResult = runPostRun(detach(ctx), postRun, transcript, res.Output, runErr) } e.finishAudit(ctx, rec, status, res, started, runErr) if e.cfg.Ports.Budget != nil { e.cfg.Ports.Budget.Commit(detach(ctx), inv.CallerID, time.Since(started).Seconds()) } e.deliver(ctx, inv, res, runErr) return res } // statusFor maps a run error to a RunStats.Status, distinguishing a critic kill // (killed), a deadline (timeout), and a cancellation (cancelled — caller cancel // or shutdown) from a generic error so audit consumers can tell them apart. The // run context's cancellation cause carries the distinction (ErrCriticKill / // DeadlineExceeded), since ctx.Err() alone only reports Canceled. func statusFor(runCtx context.Context, runErr error) string { switch { case runErr == nil: return "ok" // Only the kill is recovered from the cancellation cause — a critic kill // surfaces as a plain Canceled run error, so without this it'd read as // "cancelled". Everything else is classified by the run error itself, so a // genuine run error is never relabeled just because the context was later // cancelled, and a caller cancel/deadline stays "cancelled" (not "timeout"). case errors.Is(context.Cause(runCtx), ErrCriticKill): return "killed" case errors.Is(runErr, context.DeadlineExceeded): return "timeout" case errors.Is(runErr, context.Canceled): return "cancelled" default: return "error" } } // finishAudit writes the terminal roll-up on a detached context so a cancelled // run still records (mort's CleanupContextTimeout lesson). func (e *Executor) finishAudit(ctx context.Context, rec RunRecorder, status string, res Result, started time.Time, runErr error) { if rec == nil { return } stats := RunStats{ Status: status, Output: res.Output, ToolCalls: rec.ToolCallsCount(), RuntimeSeconds: time.Since(started).Seconds(), } if runErr != nil { stats.Error = runErr.Error() } stats.InputTokens, stats.OutputTokens, stats.ThinkingTokens = rec.TokenStats() rec.Close(detach(ctx), stats) } func (e *Executor) systemPrompt(ra RunnableAgent) string { return e.systemPromptWithBody(ra.SystemPrompt) } // systemPromptWithBody composes the optional platform header with an arbitrary // body. The single-loop path passes ra.SystemPrompt; the phase runner passes a // phase's expanded instructions, so each phase keeps the platform header. func (e *Executor) systemPromptWithBody(body string) string { if e.cfg.SystemHeader == "" { return body } if body == "" { return e.cfg.SystemHeader } return e.cfg.SystemHeader + "\n\n" + body } // compactionThreshold returns the token threshold for the tier's model context // window (ratio × limit), or 0 when the limit is unknown. func (e *Executor) compactionThreshold(tier string) int { max := e.cfg.ContextTokens(tier) if max <= 0 { return 0 } return int(float64(max) * e.cfg.Defaults.CompactionThresholdRatio) } // deliver posts the run's output (or error) via run.Ports.Delivery when both a // Delivery and a target (inv.DeliveryID) are set. No target = the caller reads // Result.Output itself (the synchronous default). Best-effort + detached: a // delivery failure must not change the run's outcome. func (e *Executor) deliver(ctx context.Context, inv tool.Invocation, res Result, runErr error) { if e.cfg.Ports.Delivery == nil || inv.DeliveryID == "" { return } target := deliver.Target{Kind: inv.DeliveryKind, ID: inv.DeliveryID} dctx := detach(ctx) if runErr != nil { _ = e.cfg.Ports.Delivery.DeliverError(dctx, target, runErr) return } _, _ = e.cfg.Ports.Delivery.Deliver(dctx, target, res.Output, nil) } // detach derives a bounded cleanup context off ctx, detached from its // cancellation, for post-run writes. The cancel is intentionally not returned; // CleanupContextTimeout bounds the lifetime. func detach(ctx context.Context) context.Context { c, cancel := context.WithTimeout(context.WithoutCancel(ctx), CleanupContextTimeout) _ = cancel // bounded by the timeout; nothing to cancel early return c } // runAgent dispatches the majordomo agent loop. majordomo's Run takes a text-only // input arg, so when the invocation carries images they're folded into the first // user message (text + image parts) via WithHistory and Run is called with an // empty input — the model then sees a multimodal opening turn. The image-less path // passes the prompt straight through. // // The text part is omitted when input is blank (image-only run), matching // runSession.AttachImages so no empty TextPart is sent. func runAgent(ctx context.Context, ag *agent.Agent, input string, images []llm.ImagePart, opts ...agent.RunOption) (*agent.Result, error) { if len(images) == 0 { return ag.Run(ctx, input, opts...) } // Copy opts before appending so a caller-supplied backing array is never // mutated/aliased (the variadic slice can have spare capacity). The multimodal // opening turn (text + image parts) is built by the shared helper. opts = append(opts[:len(opts):len(opts)], agent.WithHistory([]llm.Message{multimodalUserMessage(input, images)})) return ag.Run(ctx, "", opts...) }