package run import ( "context" "errors" "fmt" "time" "gitea.stevedudenhoeffer.com/steve/majordomo/agent" "gitea.stevedudenhoeffer.com/steve/majordomo/llm" "gitea.stevedudenhoeffer.com/steve/executus/compact" "gitea.stevedudenhoeffer.com/steve/executus/deliver" "gitea.stevedudenhoeffer.com/steve/executus/tool" ) // ModelResolver resolves a tier alias or concrete spec to a usable llm.Model // and an enriched context (for usage attribution). model.ParseModelForContext // satisfies it. type ModelResolver func(ctx context.Context, tier string) (context.Context, llm.Model, error) // Defaults are the executor's fallback caps and loop guards, applied per run // when the RunnableAgent leaves a field zero. type Defaults struct { MaxIterations int // tool-dispatch steps; default 12 MaxRuntime time.Duration // wall-clock per run; default 60s FallbackTier string // tier when the agent's is empty; default "fast" MaxConsecutiveToolErrors int // loop guard; default 3 MaxSameToolCallRepeats int // retry-storm guard; default 3 CompactionThresholdRatio float64 // fraction of model context to compact at; default 0.7 CriticSoftTimeout time.Duration // idle window before the critic wakes; default 90s } func (d Defaults) withFallbacks() Defaults { if d.MaxIterations <= 0 { d.MaxIterations = 12 } if d.MaxRuntime <= 0 { d.MaxRuntime = 60 * time.Second } if d.FallbackTier == "" { d.FallbackTier = "fast" } if d.MaxConsecutiveToolErrors <= 0 { d.MaxConsecutiveToolErrors = 3 } if d.MaxSameToolCallRepeats <= 0 { d.MaxSameToolCallRepeats = 3 } if d.CompactionThresholdRatio <= 0 { d.CompactionThresholdRatio = 0.7 } if d.CriticSoftTimeout <= 0 { d.CriticSoftTimeout = 90 * time.Second } return d } // Config wires an Executor. Registry + Models are required; everything else is // optional and nil-safe — the zero Config beyond those yields a bounded, // in-memory run with no persistence/audit/budget/critic/delegation/compaction // (gadfly's case). type Config struct { Registry tool.Registry Models ModelResolver Defaults Defaults Ports Ports // Compactor mints the per-run context-compaction hook. nil disables // compaction. ContextTokens resolves a tier's model context-window (for // the compaction threshold); nil — or a zero return — also disables it. Compactor compact.CompactorFactory ContextTokens func(tier string) int // SystemHeader is an optional platform header prepended to every agent's // system prompt. SystemHeader string } // Executor runs a RunnableAgent through majordomo's agent loop with the wired // Ports. Construct with New; safe for concurrent use across runs. type Executor struct { cfg Config } // New builds an Executor. It panics if Registry or Models is nil — those are // structural, not runtime, errors. func New(cfg Config) *Executor { if cfg.Registry == nil || cfg.Models == nil { panic("run.New: Registry and Models are required") } cfg.Defaults = cfg.Defaults.withFallbacks() return &Executor{cfg: cfg} } // Result is one run's outcome. Err carries the run failure (if any); the other // fields are populated best-effort even on error (partial output/steps/usage). type Result struct { RunID string Output string Steps []tool.Step Usage llm.Usage Err error } // Run executes ra with the given invocation + input and returns the Result. It // never propagates a panic; failures surface in Result.Err (a top-level recover // converts any panic — including from a host Port — into a run error). func (e *Executor) Run(ctx context.Context, ra RunnableAgent, inv tool.Invocation, input string) (res Result) { started := time.Now() res = Result{RunID: inv.RunID} // Enforce the no-panic contract: a panic anywhere in the run (incl. a host // Critic/Audit/Palette callback on the main goroutine) becomes Result.Err // rather than unwinding into the caller. defer func() { if r := recover(); r != nil { res.Err = fmt.Errorf("run.Executor: recovered panic: %v", r) } }() tier := ra.ModelTier if tier == "" { tier = e.cfg.Defaults.FallbackTier } maxIter := ra.MaxIterations if maxIter <= 0 { maxIter = e.cfg.Defaults.MaxIterations } maxRuntime := ra.MaxRuntime if maxRuntime <= 0 { maxRuntime = e.cfg.Defaults.MaxRuntime } // Budget gate (pre-run): a rejected run makes no model call. if e.cfg.Ports.Budget != nil { if err := e.cfg.Ports.Budget.Check(ctx, inv.CallerID); err != nil { res.Err = err return res } } // Resolve the model (enriches ctx for usage attribution). modelCtx, model, err := e.cfg.Models(ctx, tier) if err != nil { res.Err = fmt.Errorf("resolve model %q: %w", tier, err) return res } if model == nil { // A resolver returning (ctx, nil, nil) would otherwise nil-panic inside // the agent loop; surface it as a clean error (Run never panics out). res.Err = fmt.Errorf("resolve model %q: resolver returned a nil model", tier) return res } ctx = modelCtx // Audit start (optional). The recorder satisfies RunTally; stamp it on the // invocation so a self-status tool can read live progress. info := RunInfo{ RunID: inv.RunID, SubjectID: ra.ID, Name: ra.Name, CallerID: inv.CallerID, ChannelID: inv.ChannelID, ParentRunID: inv.ParentRunID, Inputs: inv.SkillInputs, StartedAt: started, MaxIterations: maxIter, } var rec RunRecorder var stateAcc *RunStateAccessor if e.cfg.Ports.Audit != nil { rec = e.cfg.Ports.Audit.StartRun(ctx, info) } if rec != nil { stateAcc = NewRunStateAccessor(rec, maxIter, 0, started) inv.RunState = stateAcc } // Build the toolbox from the agent's low-level tools. toolbox, err := e.cfg.Registry.Build(ra.LowLevelTools, inv, tool.Visibility("private"), nil) if err != nil { res.Err = fmt.Errorf("build toolbox: %w", err) e.finishAudit(ctx, rec, "error", res, started, res.Err) return res } // Add skill__/agent__ delegation tools from the agent's palette (nil-safe: // no PaletteSource or empty palette → no delegation tools). if err := addDelegationTools(toolbox, ra, inv, e.cfg.Ports.Palette); err != nil { res.Err = fmt.Errorf("build delegation tools: %w", err) e.finishAudit(ctx, rec, "error", res, started, res.Err) return res } // Run context: bound by MaxRuntime, detached from the caller's deadline so a // lane/queue wait doesn't eat the run budget (mort's V10 lesson). Caller // cancellation still propagates via MergeCancellation. Created BEFORE the // step observer so the observer forwards the merged run context (not a // possibly-cancelled caller ctx) to OnStep consumers. // Cause-carrying cancel so a critic kill, a backstop timeout, and a caller // cancel land as distinct statuses. MaxRuntime is enforced by a timer that // cancels with DeadlineExceeded (preserving the old WithTimeout → "timeout"). runCtx, cancelCause := context.WithCancelCause(context.WithoutCancel(ctx)) defer cancelCause(nil) runTimer := time.AfterFunc(maxRuntime, func() { cancelCause(context.DeadlineExceeded) }) defer runTimer.Stop() runCtx, mergeCancel := MergeCancellation(runCtx, ctx) defer mergeCancel() // Critic (optional): monitors the run for a stall, can nudge/extend/kill via // its host Escalator. Its hard deadline is bound to runCtx (cancel on pass). // nil-safe: no-op when no critic is configured or the agent doesn't enable it. critic, stopCritic := e.startCritic(runCtx, cancelCause, ra, info) defer stopCritic() // Step instrumentation: accumulate Result.Steps + fire inv.OnStep, feed the // audit recorder, and keep the live iteration counter fresh. majordomo's // step observer hands us each completed iteration; we zip the model's tool // calls with their executed results PAIRWISE — a result without a matching // call (or a call without a result) is skipped rather than recorded as an // empty-name "ghost" step. emitter := newStepEmitter(inv.OnStep) stepObserver := func(s agent.Step) { if stateAcc != nil { stateAcc.SetIteration(s.Index) } if rec != nil { rec.OnStep(s.Index, s.Response) } critic.recordStep(s.Index, s.Response) // keep the critic's activity clock fresh + carry the step payload var calls []llm.ToolCall if s.Response != nil { calls = s.Response.ToolCalls } n := len(s.Results) if len(calls) < n { n = len(calls) } for i := 0; i < n; i++ { call, r := calls[i], s.Results[i] critic.recordToolStart(call.Name, string(call.Arguments)) emitter.toolStart(runCtx, call.Name, call.Arguments) emitter.toolEnd(runCtx, call, r.Content, r.IsError) if rec != nil { rec.OnTool(call, r.Content) } } } opts := []agent.Option{ agent.WithToolbox(toolbox), // Step ceiling: a fixed WithMaxSteps(maxIter) normally, but when a critic is // active it owns a DYNAMIC ceiling (WithMaxStepsFunc) so it can raise a // healthy-but-long run's budget mid-flight. Falls back to maxIter. critic.maxStepsOption(maxIter), agent.WithToolErrorLimits(e.cfg.Defaults.MaxConsecutiveToolErrors, e.cfg.Defaults.MaxSameToolCallRepeats), agent.WithStepObserver(stepObserver), } if e.cfg.Compactor != nil && e.cfg.ContextTokens != nil { if threshold := e.compactionThreshold(tier); threshold > 0 { // Forward compaction events to the audit log (makes the // CompactionEvent doc's "logged to the run trace" promise true). var onFire func(compact.CompactionEvent) if rec != nil { onFire = func(ev compact.CompactionEvent) { rec.LogEvent("compaction_fired", map[string]any{ "messages_before": ev.MessagesBefore, "messages_after": ev.MessagesAfter, "tokens_before": ev.TokensBefore, "tokens_after": ev.TokensAfter, }) } } opts = append(opts, agent.WithCompactor(e.cfg.Compactor(threshold, onFire))) } } ag := agent.New(model, e.systemPrompt(ra), opts...) runRes, runErr := ag.Run(runCtx, input, critic.steerOptions()...) status := statusFor(runCtx, runErr) if runRes != nil { res.Output = runRes.Output res.Usage = runRes.Usage } res.Steps = emitter.snapshot() res.Err = runErr e.finishAudit(ctx, rec, status, res, started, runErr) if e.cfg.Ports.Budget != nil { e.cfg.Ports.Budget.Commit(detach(ctx), inv.CallerID, time.Since(started).Seconds()) } e.deliver(ctx, inv, res, runErr) return res } // statusFor maps a run error to a RunStats.Status, distinguishing a critic kill // (killed), a deadline (timeout), and a cancellation (cancelled — caller cancel // or shutdown) from a generic error so audit consumers can tell them apart. The // run context's cancellation cause carries the distinction (ErrCriticKill / // DeadlineExceeded), since ctx.Err() alone only reports Canceled. func statusFor(runCtx context.Context, runErr error) string { switch { case runErr == nil: return "ok" case errors.Is(context.Cause(runCtx), ErrCriticKill): return "killed" case errors.Is(runErr, context.DeadlineExceeded) || errors.Is(context.Cause(runCtx), context.DeadlineExceeded): return "timeout" case errors.Is(runErr, context.Canceled): return "cancelled" default: return "error" } } // finishAudit writes the terminal roll-up on a detached context so a cancelled // run still records (mort's CleanupContextTimeout lesson). func (e *Executor) finishAudit(ctx context.Context, rec RunRecorder, status string, res Result, started time.Time, runErr error) { if rec == nil { return } stats := RunStats{ Status: status, Output: res.Output, ToolCalls: rec.ToolCallsCount(), RuntimeSeconds: time.Since(started).Seconds(), } if runErr != nil { stats.Error = runErr.Error() } stats.InputTokens, stats.OutputTokens, stats.ThinkingTokens = rec.TokenStats() rec.Close(detach(ctx), stats) } func (e *Executor) systemPrompt(ra RunnableAgent) string { if e.cfg.SystemHeader == "" { return ra.SystemPrompt } if ra.SystemPrompt == "" { return e.cfg.SystemHeader } return e.cfg.SystemHeader + "\n\n" + ra.SystemPrompt } // compactionThreshold returns the token threshold for the tier's model context // window (ratio × limit), or 0 when the limit is unknown. func (e *Executor) compactionThreshold(tier string) int { max := e.cfg.ContextTokens(tier) if max <= 0 { return 0 } return int(float64(max) * e.cfg.Defaults.CompactionThresholdRatio) } // deliver posts the run's output (or error) via run.Ports.Delivery when both a // Delivery and a target (inv.DeliveryID) are set. No target = the caller reads // Result.Output itself (the synchronous default). Best-effort + detached: a // delivery failure must not change the run's outcome. func (e *Executor) deliver(ctx context.Context, inv tool.Invocation, res Result, runErr error) { if e.cfg.Ports.Delivery == nil || inv.DeliveryID == "" { return } target := deliver.Target{Kind: inv.DeliveryKind, ID: inv.DeliveryID} dctx := detach(ctx) if runErr != nil { _ = e.cfg.Ports.Delivery.DeliverError(dctx, target, runErr) return } _, _ = e.cfg.Ports.Delivery.Deliver(dctx, target, res.Output, nil) } // detach derives a bounded cleanup context off ctx, detached from its // cancellation, for post-run writes. The cancel is intentionally not returned; // CleanupContextTimeout bounds the lifetime. func detach(ctx context.Context) context.Context { c, cancel := context.WithTimeout(context.WithoutCancel(ctx), CleanupContextTimeout) _ = cancel // bounded by the timeout; nothing to cancel early return c }