Files
executus/run/executor.go
T
steve 784d5d7ce4
executus CI / test (pull_request) Successful in 45s
executus CI / test (push) Successful in 1m47s
run: PostRun detached ctx + panic-isolated Cleanup (gadfly #12)
Two convergent gadfly refinements on the PostRun wiring:
- PostRun now runs on detach(ctx), not the caller's ctx — a finished/cancelled
  caller no longer aborts artifact production (3-model: glm-5.2/minimax/deepseek).
- Cleanup is panic-isolated via safeCleanup (recover+log), matching runPostRun, so
  a misbehaving teardown can't clobber an otherwise-successful run (deepseek).

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-27 18:33:41 -04:00

443 lines
16 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
package run
import (
"context"
"errors"
"fmt"
"time"
"gitea.stevedudenhoeffer.com/steve/majordomo/agent"
"gitea.stevedudenhoeffer.com/steve/majordomo/llm"
"gitea.stevedudenhoeffer.com/steve/executus/compact"
"gitea.stevedudenhoeffer.com/steve/executus/deliver"
"gitea.stevedudenhoeffer.com/steve/executus/tool"
)
// ModelResolver resolves a tier alias or concrete spec to a usable llm.Model
// and an enriched context (for usage attribution). model.ParseModelForContext
// satisfies it.
type ModelResolver func(ctx context.Context, tier string) (context.Context, llm.Model, error)
// Defaults are the executor's fallback caps and loop guards, applied per run
// when the RunnableAgent leaves a field zero.
type Defaults struct {
MaxIterations int // tool-dispatch steps; default 12
MaxRuntime time.Duration // wall-clock per run; default 60s
FallbackTier string // tier when the agent's is empty; default "fast"
MaxConsecutiveToolErrors int // loop guard; default 3
MaxSameToolCallRepeats int // retry-storm guard; default 3
CompactionThresholdRatio float64 // fraction of model context to compact at; default 0.7
CriticSoftTimeout time.Duration // idle window before the critic wakes; default 90s
}
func (d Defaults) withFallbacks() Defaults {
if d.MaxIterations <= 0 {
d.MaxIterations = 12
}
if d.MaxRuntime <= 0 {
d.MaxRuntime = 60 * time.Second
}
if d.FallbackTier == "" {
d.FallbackTier = "fast"
}
if d.MaxConsecutiveToolErrors <= 0 {
d.MaxConsecutiveToolErrors = 3
}
if d.MaxSameToolCallRepeats <= 0 {
d.MaxSameToolCallRepeats = 3
}
if d.CompactionThresholdRatio <= 0 {
d.CompactionThresholdRatio = 0.7
}
if d.CriticSoftTimeout <= 0 {
d.CriticSoftTimeout = 90 * time.Second
}
return d
}
// Config wires an Executor. Registry + Models are required; everything else is
// optional and nil-safe — the zero Config beyond those yields a bounded,
// in-memory run with no persistence/audit/budget/critic/delegation/compaction
// (gadfly's case).
type Config struct {
Registry tool.Registry
Models ModelResolver
Defaults Defaults
Ports Ports
// Compactor mints the per-run context-compaction hook. nil disables
// compaction. ContextTokens resolves a tier's model context-window (for
// the compaction threshold); nil — or a zero return — also disables it.
Compactor compact.CompactorFactory
ContextTokens func(tier string) int
// SystemHeader is an optional platform header prepended to every agent's
// system prompt.
SystemHeader string
}
// Executor runs a RunnableAgent through majordomo's agent loop with the wired
// Ports. Construct with New; safe for concurrent use across runs.
type Executor struct {
cfg Config
}
// New builds an Executor. It panics if Registry or Models is nil — those are
// structural, not runtime, errors.
func New(cfg Config) *Executor {
if cfg.Registry == nil || cfg.Models == nil {
panic("run.New: Registry and Models are required")
}
cfg.Defaults = cfg.Defaults.withFallbacks()
return &Executor{cfg: cfg}
}
// Result is one run's outcome. Err carries the run failure (if any); the other
// fields are populated best-effort even on error (partial output/steps/usage).
type Result struct {
RunID string
Output string
Steps []tool.Step
Usage llm.Usage
Err error
// PostRunResult carries artifacts produced by a SessionToolFactory's PostRun
// hook (rendered images, files). nil when no factory was set or PostRun
// returned nil. The host delivers these (e.g. mort's chat API / Discord).
PostRunResult *tool.PostRunResult
}
// Run executes ra with the given invocation + input and returns the Result. It
// never propagates a panic; failures surface in Result.Err (a top-level recover
// converts any panic — including from a host Port — into a run error).
func (e *Executor) Run(ctx context.Context, ra RunnableAgent, inv tool.Invocation, input string) (res Result) {
started := time.Now()
res = Result{RunID: inv.RunID}
// Enforce the no-panic contract: a panic anywhere in the run (incl. a host
// Critic/Audit/Palette callback on the main goroutine) becomes Result.Err
// rather than unwinding into the caller.
defer func() {
if r := recover(); r != nil {
res.Err = fmt.Errorf("run.Executor: recovered panic: %v", r)
}
}()
tier := ra.ModelTier
if tier == "" {
tier = e.cfg.Defaults.FallbackTier
}
maxIter := ra.MaxIterations
if maxIter <= 0 {
maxIter = e.cfg.Defaults.MaxIterations
}
maxRuntime := ra.MaxRuntime
if maxRuntime <= 0 {
maxRuntime = e.cfg.Defaults.MaxRuntime
}
// Budget gate (pre-run): a rejected run makes no model call.
if e.cfg.Ports.Budget != nil {
if err := e.cfg.Ports.Budget.Check(ctx, inv.CallerID); err != nil {
res.Err = err
return res
}
}
// Resolve the model (enriches ctx for usage attribution).
modelCtx, model, err := e.cfg.Models(ctx, tier)
if err != nil {
res.Err = fmt.Errorf("resolve model %q: %w", tier, err)
return res
}
if model == nil {
// A resolver returning (ctx, nil, nil) would otherwise nil-panic inside
// the agent loop; surface it as a clean error (Run never panics out).
res.Err = fmt.Errorf("resolve model %q: resolver returned a nil model", tier)
return res
}
ctx = modelCtx
// Audit start (optional). The recorder satisfies RunTally; stamp it on the
// invocation so a self-status tool can read live progress.
info := RunInfo{
RunID: inv.RunID,
SubjectID: ra.ID,
Name: ra.Name,
CallerID: inv.CallerID,
ChannelID: inv.ChannelID,
ParentRunID: inv.ParentRunID,
Inputs: inv.SkillInputs,
StartedAt: started,
MaxIterations: maxIter,
}
var rec RunRecorder
var stateAcc *RunStateAccessor
if e.cfg.Ports.Audit != nil {
rec = e.cfg.Ports.Audit.StartRun(ctx, info)
}
if rec != nil {
stateAcc = NewRunStateAccessor(rec, maxIter, 0, started)
inv.RunState = stateAcc
}
// Steer mailbox: lets session tools (via inv.AttachImages) feed multimodal
// messages into the running conversation before its next step. Created BEFORE
// the toolbox build so any tool's handler captures the live AttachImages seam.
mailbox := &steerMailbox{}
inv.AttachImages = (&runSession{mailbox: mailbox}).AttachImages
// Build the toolbox from the agent's low-level tools.
toolbox, err := e.cfg.Registry.Build(ra.LowLevelTools, inv, tool.Visibility("private"), nil)
if err != nil {
res.Err = fmt.Errorf("build toolbox: %w", err)
e.finishAudit(ctx, rec, "error", res, started, res.Err)
return res
}
// Add skill__/agent__ delegation tools from the agent's palette (nil-safe:
// no PaletteSource or empty palette → no delegation tools).
if err := addDelegationTools(toolbox, ra, inv, e.cfg.Ports.Palette); err != nil {
res.Err = fmt.Errorf("build delegation tools: %w", err)
e.finishAudit(ctx, rec, "error", res, started, res.Err)
return res
}
// Per-invocation ExtraTools + a SessionToolFactory's per-run tools, added on
// top of the agent's palette. The factory closes over the live session (the
// AttachImages mailbox); its PostRun hook (held for after the run) produces
// artifacts attached to res.PostRunResult, and its Cleanup is deferred. All
// nil-safe.
for _, t := range inv.ExtraTools {
if err := toolbox.Add(t); err != nil {
res.Err = fmt.Errorf("add extra tool: %w", err)
e.finishAudit(ctx, rec, "error", res, started, res.Err)
return res
}
}
var postRun func(ctx context.Context, transcript []llm.Message, output string, runErr error) *tool.PostRunResult
if inv.SessionToolFactory != nil {
st := inv.SessionToolFactory(&runSession{mailbox: mailbox})
if st.Cleanup != nil {
defer safeCleanup(st.Cleanup) // panic-isolated, like runPostRun
}
for _, t := range st.Tools {
if err := toolbox.Add(t); err != nil {
res.Err = fmt.Errorf("add session tool: %w", err)
e.finishAudit(ctx, rec, "error", res, started, res.Err)
return res
}
}
postRun = st.PostRun
}
// Run context: bound by MaxRuntime, detached from the caller's deadline so a
// lane/queue wait doesn't eat the run budget (mort's V10 lesson). Caller
// cancellation still propagates via MergeCancellation. Created BEFORE the
// step observer so the observer forwards the merged run context (not a
// possibly-cancelled caller ctx) to OnStep consumers.
// MaxRuntime stays a WithTimeout so its DeadlineExceeded propagates through the
// child chain (→ "timeout"), preserving the run's-own-timeout vs caller-cancel
// distinction. A NESTED cause-carrying layer lets a critic kill surface as a
// distinct "killed" without disturbing that: only an ErrCriticKill cause is
// consulted in statusFor; a generic run error or a caller cancel is classified
// by the run error itself.
timeoutCtx, cancelTimeout := context.WithTimeout(context.WithoutCancel(ctx), maxRuntime)
defer cancelTimeout()
runCtx, cancelCause := context.WithCancelCause(timeoutCtx)
defer cancelCause(nil)
runCtx, mergeCancel := MergeCancellation(runCtx, ctx)
defer mergeCancel()
// Critic (optional): monitors the run for a stall, can nudge/extend/kill via
// its host Escalator. Its hard deadline is bound to runCtx (cancel on pass).
// nil-safe: no-op when no critic is configured or the agent doesn't enable it.
critic, stopCritic := e.startCritic(runCtx, cancelCause, ra, info)
defer stopCritic()
// Step instrumentation: accumulate Result.Steps + fire inv.OnStep, feed the
// audit recorder, and keep the live iteration counter fresh. majordomo's
// step observer hands us each completed iteration; we zip the model's tool
// calls with their executed results PAIRWISE — a result without a matching
// call (or a call without a result) is skipped rather than recorded as an
// empty-name "ghost" step.
emitter := newStepEmitter(inv.OnStep)
stepObserver := func(s agent.Step) {
if stateAcc != nil {
stateAcc.SetIteration(s.Index)
}
if rec != nil {
rec.OnStep(s.Index, s.Response)
}
critic.recordStep(s.Index, s.Response) // keep the critic's activity clock fresh + carry the step payload
var calls []llm.ToolCall
if s.Response != nil {
calls = s.Response.ToolCalls
}
n := len(s.Results)
if len(calls) < n {
n = len(calls)
}
for i := 0; i < n; i++ {
call, r := calls[i], s.Results[i]
critic.recordToolStart(call.Name, string(call.Arguments))
emitter.toolStart(runCtx, call.Name, call.Arguments)
emitter.toolEnd(runCtx, call, r.Content, r.IsError)
if rec != nil {
rec.OnTool(call, r.Content)
}
}
}
opts := []agent.Option{
agent.WithToolbox(toolbox),
// Step ceiling: a fixed WithMaxSteps(maxIter) normally, but when a critic is
// active it owns a DYNAMIC ceiling (WithMaxStepsFunc) so it can raise a
// healthy-but-long run's budget mid-flight. Falls back to maxIter.
critic.maxStepsOption(maxIter),
agent.WithToolErrorLimits(e.cfg.Defaults.MaxConsecutiveToolErrors, e.cfg.Defaults.MaxSameToolCallRepeats),
agent.WithStepObserver(stepObserver),
}
if e.cfg.Compactor != nil && e.cfg.ContextTokens != nil {
if threshold := e.compactionThreshold(tier); threshold > 0 {
// Forward compaction events to the audit log (makes the
// CompactionEvent doc's "logged to the run trace" promise true).
var onFire func(compact.CompactionEvent)
if rec != nil {
onFire = func(ev compact.CompactionEvent) {
rec.LogEvent("compaction_fired", map[string]any{
"messages_before": ev.MessagesBefore,
"messages_after": ev.MessagesAfter,
"tokens_before": ev.TokensBefore,
"tokens_after": ev.TokensAfter,
})
}
}
opts = append(opts, agent.WithCompactor(e.cfg.Compactor(threshold, onFire)))
}
}
ag := agent.New(model, e.systemPrompt(ra), opts...)
// One WithSteer drains BOTH the session mailbox (a tool's AttachImages) and
// the critic's nudges before each step.
steer := func() []llm.Message { return append(mailbox.drain(), critic.drainSteer()...) }
runRes, runErr := ag.Run(runCtx, input, agent.WithSteer(steer))
status := statusFor(runCtx, runErr)
if runRes != nil {
res.Output = runRes.Output
res.Usage = runRes.Usage
}
res.Steps = emitter.snapshot()
res.Err = runErr
// PostRun: hand the SessionToolFactory's hook the full transcript (populated
// even on partial results) so it can produce artifacts. Best-effort +
// panic-isolated — a PostRun failure never fails an otherwise-successful run.
if postRun != nil {
var transcript []llm.Message
if runRes != nil {
transcript = runRes.Messages
}
// Detach from the caller's ctx: a finished/cancelled caller must not abort
// artifact production (the hook owns its own bounding, per its contract).
res.PostRunResult = runPostRun(detach(ctx), postRun, transcript, res.Output, runErr)
}
e.finishAudit(ctx, rec, status, res, started, runErr)
if e.cfg.Ports.Budget != nil {
e.cfg.Ports.Budget.Commit(detach(ctx), inv.CallerID, time.Since(started).Seconds())
}
e.deliver(ctx, inv, res, runErr)
return res
}
// statusFor maps a run error to a RunStats.Status, distinguishing a critic kill
// (killed), a deadline (timeout), and a cancellation (cancelled — caller cancel
// or shutdown) from a generic error so audit consumers can tell them apart. The
// run context's cancellation cause carries the distinction (ErrCriticKill /
// DeadlineExceeded), since ctx.Err() alone only reports Canceled.
func statusFor(runCtx context.Context, runErr error) string {
switch {
case runErr == nil:
return "ok"
// Only the kill is recovered from the cancellation cause — a critic kill
// surfaces as a plain Canceled run error, so without this it'd read as
// "cancelled". Everything else is classified by the run error itself, so a
// genuine run error is never relabeled just because the context was later
// cancelled, and a caller cancel/deadline stays "cancelled" (not "timeout").
case errors.Is(context.Cause(runCtx), ErrCriticKill):
return "killed"
case errors.Is(runErr, context.DeadlineExceeded):
return "timeout"
case errors.Is(runErr, context.Canceled):
return "cancelled"
default:
return "error"
}
}
// finishAudit writes the terminal roll-up on a detached context so a cancelled
// run still records (mort's CleanupContextTimeout lesson).
func (e *Executor) finishAudit(ctx context.Context, rec RunRecorder, status string, res Result, started time.Time, runErr error) {
if rec == nil {
return
}
stats := RunStats{
Status: status,
Output: res.Output,
ToolCalls: rec.ToolCallsCount(),
RuntimeSeconds: time.Since(started).Seconds(),
}
if runErr != nil {
stats.Error = runErr.Error()
}
stats.InputTokens, stats.OutputTokens, stats.ThinkingTokens = rec.TokenStats()
rec.Close(detach(ctx), stats)
}
func (e *Executor) systemPrompt(ra RunnableAgent) string {
if e.cfg.SystemHeader == "" {
return ra.SystemPrompt
}
if ra.SystemPrompt == "" {
return e.cfg.SystemHeader
}
return e.cfg.SystemHeader + "\n\n" + ra.SystemPrompt
}
// compactionThreshold returns the token threshold for the tier's model context
// window (ratio × limit), or 0 when the limit is unknown.
func (e *Executor) compactionThreshold(tier string) int {
max := e.cfg.ContextTokens(tier)
if max <= 0 {
return 0
}
return int(float64(max) * e.cfg.Defaults.CompactionThresholdRatio)
}
// deliver posts the run's output (or error) via run.Ports.Delivery when both a
// Delivery and a target (inv.DeliveryID) are set. No target = the caller reads
// Result.Output itself (the synchronous default). Best-effort + detached: a
// delivery failure must not change the run's outcome.
func (e *Executor) deliver(ctx context.Context, inv tool.Invocation, res Result, runErr error) {
if e.cfg.Ports.Delivery == nil || inv.DeliveryID == "" {
return
}
target := deliver.Target{Kind: inv.DeliveryKind, ID: inv.DeliveryID}
dctx := detach(ctx)
if runErr != nil {
_ = e.cfg.Ports.Delivery.DeliverError(dctx, target, runErr)
return
}
_, _ = e.cfg.Ports.Delivery.Deliver(dctx, target, res.Output, nil)
}
// detach derives a bounded cleanup context off ctx, detached from its
// cancellation, for post-run writes. The cancel is intentionally not returned;
// CleanupContextTimeout bounds the lifetime.
func detach(ctx context.Context) context.Context {
c, cancel := context.WithTimeout(context.WithoutCancel(ctx), CleanupContextTimeout)
_ = cancel // bounded by the timeout; nothing to cancel early
return c
}