Files
executus/run/executor.go
T
steve e76eed0011
executus CI / test (pull_request) Successful in 1m0s
Adversarial Review (Gadfly) / review (pull_request) Successful in 28m58s
P2: run.Executor — executus is runnable
The capstone of the run kernel: run.Executor.Run(ctx, RunnableAgent, inv)
ties model resolution + the tool registry + majordomo's agent loop +
context compaction + run-bounding + step/audit instrumentation into one
path, with every host concern behind the nil-safe run.Ports.

- run/executor.go: New(Config{Registry, Models, Defaults, Ports, Compactor,
  ContextTokens, SystemHeader}) + Run -> Result{RunID, Output, Steps, Usage,
  Err}. Budget gate (pre-run), model resolve, Audit StartRun/recorder
  (satisfies RunTally, stamped on inv.RunState), toolbox build, step observer
  (zips tool calls/results -> emitter + recorder.OnStep/OnTool), V10
  detached-MaxRuntime context with caller-cancel merged back, compaction wired
  from ContextTokens×ratio, audit Close + Budget Commit on a detached cleanup
  ctx. Zero Ports = a bounded in-memory run (gadfly's case).
- run/executor_test.go: hermetic end-to-end run against majordomo's fake
  provider (hello-world), Budget-rejection (no model call), Audit-port wiring
  (StartRun + Close with terminal status/output). All green under -race.
- examples/minimal upgraded to the real "hello, agentic world" (~15 lines:
  Configure tiers -> run.New -> Run -> print). README/CLAUDE.md updated.

Remaining P2 follow-ups (incremental): wire Critic/Checkpointer/PaletteSource/
Delivery into the loop, multi-phase Pipelines, and the no-tools direct path.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-26 20:45:10 -04:00

275 lines
8.4 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
package run
import (
"context"
"fmt"
"time"
"gitea.stevedudenhoeffer.com/steve/majordomo/agent"
"gitea.stevedudenhoeffer.com/steve/majordomo/llm"
"gitea.stevedudenhoeffer.com/steve/executus/compact"
"gitea.stevedudenhoeffer.com/steve/executus/tool"
)
// ModelResolver resolves a tier alias or concrete spec to a usable llm.Model
// and an enriched context (for usage attribution). model.ParseModelForContext
// satisfies it.
type ModelResolver func(ctx context.Context, tier string) (context.Context, llm.Model, error)
// Defaults are the executor's fallback caps and loop guards, applied per run
// when the RunnableAgent leaves a field zero.
type Defaults struct {
MaxIterations int // tool-dispatch steps; default 12
MaxRuntime time.Duration // wall-clock per run; default 60s
FallbackTier string // tier when the agent's is empty; default "fast"
MaxConsecutiveToolErrors int // loop guard; default 3
MaxSameToolCallRepeats int // retry-storm guard; default 3
CompactionThresholdRatio float64 // fraction of model context to compact at; default 0.7
}
func (d Defaults) withFallbacks() Defaults {
if d.MaxIterations <= 0 {
d.MaxIterations = 12
}
if d.MaxRuntime <= 0 {
d.MaxRuntime = 60 * time.Second
}
if d.FallbackTier == "" {
d.FallbackTier = "fast"
}
if d.MaxConsecutiveToolErrors <= 0 {
d.MaxConsecutiveToolErrors = 3
}
if d.MaxSameToolCallRepeats <= 0 {
d.MaxSameToolCallRepeats = 3
}
if d.CompactionThresholdRatio <= 0 {
d.CompactionThresholdRatio = 0.7
}
return d
}
// Config wires an Executor. Registry + Models are required; everything else is
// optional and nil-safe — the zero Config beyond those yields a bounded,
// in-memory run with no persistence/audit/budget/critic/delegation/compaction
// (gadfly's case).
type Config struct {
Registry tool.Registry
Models ModelResolver
Defaults Defaults
Ports Ports
// Compactor mints the per-run context-compaction hook. nil disables
// compaction. ContextTokens resolves a tier's model context-window (for
// the compaction threshold); nil — or a zero return — also disables it.
Compactor compact.CompactorFactory
ContextTokens func(tier string) int
// SystemHeader is an optional platform header prepended to every agent's
// system prompt.
SystemHeader string
}
// Executor runs a RunnableAgent through majordomo's agent loop with the wired
// Ports. Construct with New; safe for concurrent use across runs.
type Executor struct {
cfg Config
}
// New builds an Executor. It panics if Registry or Models is nil — those are
// structural, not runtime, errors.
func New(cfg Config) *Executor {
if cfg.Registry == nil || cfg.Models == nil {
panic("run.New: Registry and Models are required")
}
cfg.Defaults = cfg.Defaults.withFallbacks()
return &Executor{cfg: cfg}
}
// Result is one run's outcome. Err carries the run failure (if any); the other
// fields are populated best-effort even on error (partial output/steps/usage).
type Result struct {
RunID string
Output string
Steps []tool.Step
Usage llm.Usage
Err error
}
// Run executes ra with the given invocation + input and returns the Result. It
// never propagates a panic; failures surface in Result.Err.
func (e *Executor) Run(ctx context.Context, ra RunnableAgent, inv tool.Invocation, input string) Result {
started := time.Now()
res := Result{RunID: inv.RunID}
tier := ra.ModelTier
if tier == "" {
tier = e.cfg.Defaults.FallbackTier
}
maxIter := ra.MaxIterations
if maxIter <= 0 {
maxIter = e.cfg.Defaults.MaxIterations
}
maxRuntime := ra.MaxRuntime
if maxRuntime <= 0 {
maxRuntime = e.cfg.Defaults.MaxRuntime
}
// Budget gate (pre-run): a rejected run makes no model call.
if e.cfg.Ports.Budget != nil {
if err := e.cfg.Ports.Budget.Check(ctx, inv.CallerID); err != nil {
res.Err = err
return res
}
}
// Resolve the model (enriches ctx for usage attribution).
modelCtx, model, err := e.cfg.Models(ctx, tier)
if err != nil {
res.Err = fmt.Errorf("resolve model %q: %w", tier, err)
return res
}
ctx = modelCtx
// Audit start (optional). The recorder satisfies RunTally; stamp it on the
// invocation so a self-status tool can read live progress.
var rec RunRecorder
if e.cfg.Ports.Audit != nil {
rec = e.cfg.Ports.Audit.StartRun(ctx, RunInfo{
RunID: inv.RunID,
SubjectID: ra.ID,
Name: ra.Name,
CallerID: inv.CallerID,
ChannelID: inv.ChannelID,
ParentRunID: inv.ParentRunID,
Inputs: inv.SkillInputs,
StartedAt: started,
})
}
if rec != nil {
inv.RunState = NewRunStateAccessor(rec, maxIter, 0, started)
}
// Build the toolbox from the agent's low-level tools.
toolbox, err := e.cfg.Registry.Build(ra.LowLevelTools, inv, tool.Visibility("private"), nil)
if err != nil {
res.Err = fmt.Errorf("build toolbox: %w", err)
e.finishAudit(ctx, rec, "error", res, started, res.Err)
return res
}
// Step instrumentation: accumulate Result.Steps + fire inv.OnStep, and feed
// the audit recorder. majordomo's step observer hands us each completed
// iteration; we zip the model's tool calls with their executed results.
emitter := newStepEmitter(inv.OnStep)
stepObserver := func(s agent.Step) {
if rec != nil {
rec.OnStep(s.Index, s.Response)
}
var calls []llm.ToolCall
if s.Response != nil {
calls = s.Response.ToolCalls
}
for i, r := range s.Results {
var call llm.ToolCall
if i < len(calls) {
call = calls[i]
}
emitter.toolStart(ctx, call.Name, call.Arguments)
emitter.toolEnd(ctx, call, r.Content, r.IsError)
if rec != nil {
rec.OnTool(call, r.Content)
}
}
}
// Run context: bound by MaxRuntime, detached from the caller's deadline so a
// lane/queue wait doesn't eat the run budget (mort's V10 lesson). Caller
// cancellation still propagates via MergeCancellation.
runCtx, cancel := context.WithTimeout(context.WithoutCancel(ctx), maxRuntime)
defer cancel()
runCtx, mergeCancel := MergeCancellation(runCtx, ctx)
defer mergeCancel()
opts := []agent.Option{
agent.WithToolbox(toolbox),
agent.WithMaxSteps(maxIter),
agent.WithToolErrorLimits(e.cfg.Defaults.MaxConsecutiveToolErrors, e.cfg.Defaults.MaxSameToolCallRepeats),
agent.WithStepObserver(stepObserver),
}
if e.cfg.Compactor != nil && e.cfg.ContextTokens != nil {
if threshold := e.compactionThreshold(tier); threshold > 0 {
opts = append(opts, agent.WithCompactor(e.cfg.Compactor(threshold, nil)))
}
}
ag := agent.New(model, e.systemPrompt(ra), opts...)
runRes, runErr := ag.Run(runCtx, input)
status := "ok"
if runErr != nil {
status = "error"
}
if runRes != nil {
res.Output = runRes.Output
res.Usage = runRes.Usage
}
res.Steps = emitter.snapshot()
res.Err = runErr
e.finishAudit(ctx, rec, status, res, started, runErr)
if e.cfg.Ports.Budget != nil {
e.cfg.Ports.Budget.Commit(detach(ctx), inv.CallerID, time.Since(started).Seconds())
}
return res
}
// finishAudit writes the terminal roll-up on a detached context so a cancelled
// run still records (mort's CleanupContextTimeout lesson).
func (e *Executor) finishAudit(ctx context.Context, rec RunRecorder, status string, res Result, started time.Time, runErr error) {
if rec == nil {
return
}
stats := RunStats{
Status: status,
Output: res.Output,
ToolCalls: rec.ToolCallsCount(),
RuntimeSeconds: time.Since(started).Seconds(),
}
if runErr != nil {
stats.Error = runErr.Error()
}
stats.InputTokens, stats.OutputTokens, stats.ThinkingTokens = rec.TokenStats()
rec.Close(detach(ctx), stats)
}
func (e *Executor) systemPrompt(ra RunnableAgent) string {
if e.cfg.SystemHeader == "" {
return ra.SystemPrompt
}
if ra.SystemPrompt == "" {
return e.cfg.SystemHeader
}
return e.cfg.SystemHeader + "\n\n" + ra.SystemPrompt
}
// compactionThreshold returns the token threshold for the tier's model context
// window (ratio × limit), or 0 when the limit is unknown.
func (e *Executor) compactionThreshold(tier string) int {
max := e.cfg.ContextTokens(tier)
if max <= 0 {
return 0
}
return int(float64(max) * e.cfg.Defaults.CompactionThresholdRatio)
}
// detach derives a bounded cleanup context off ctx, detached from its
// cancellation, for post-run writes. The cancel is intentionally not returned;
// CleanupContextTimeout bounds the lifetime.
func detach(ctx context.Context) context.Context {
c, cancel := context.WithTimeout(context.WithoutCancel(ctx), CleanupContextTimeout)
_ = cancel // bounded by the timeout; nothing to cancel early
return c
}