diff --git a/CLAUDE.md b/CLAUDE.md index 06ee6ed..86a7be9 100644 --- a/CLAUDE.md +++ b/CLAUDE.md @@ -43,12 +43,13 @@ CORE (majordomo + stdlib): fanout/ programmatic N×M swarm [P0 ✓] deliver/ output egress seam (+ Discard/Stdout) [P0 ✓] identity/ caller identity seams [P0 ✓] - run/ run-loop mechanics + RunnableAgent DTO + [P2 wip] - nil-safe run.Ports (Audit/Budget/Critic/ - Checkpointer/PaletteSource) + step - instrumentation (steps.go) done; the - agentexec+skillexec -> run.Executor MERGE - (consuming Ports) is the remaining P2 work [P2] + run/ run.Executor is RUNNABLE: model-resolve + [P2 core ✓] + toolbox + majordomo loop + compaction + + run-bounding (V10 detached timeout) + step/ + audit observers + Budget gate; RunnableAgent + DTO + nil-safe run.Ports. Follow-ups: wire + Critic/Checkpointer/PaletteSource/Delivery, + Phases, and the no-tools direct path [P2] dispatchguard/ loop/depth/fan-out caps [P0 ✓] pendingattach/ attachment dedupe [P0 ✓] tool/ registry + 3-stage permissions + ssrf [P1 ✓] diff --git a/README.md b/README.md index 367d652..80b53d8 100644 --- a/README.md +++ b/README.md @@ -31,15 +31,23 @@ bot) — mort and gadfly are the first two consumers (heavy and light). See [mort]: https://gitea.stevedudenhoeffer.com/steve/mort -**Available today (P0):** +**Available today:** +- `run/` — **executus is runnable.** `run.Executor` ties model resolution, the + tool registry, majordomo's agent loop, context compaction, run-bounding, and + step/audit instrumentation into one `Run(ctx, RunnableAgent, inv) Result`, with + every host concern behind a nil-safe `run.Ports` (Audit/Budget/Critic/ + Checkpointer/PaletteSource/Delivery). See `examples/minimal`. +- `model/` — config-driven tier resolution + failover over majordomo, with + pluggable `UsageSink`/`TraceSink` and `GenerateWith[T]` structured output. +- `tool/` — the tool registry + 3-stage permission model + SSRF guard. +- `compact/` — the per-run context compactor. - `lane/` — bounded worker pool with fair-share queueing (run- and provider-concurrency). - `fanout/` — programmatic N×M swarm with bounded global + per-key concurrency. -- `config/` — the host config seam (`Source`) with an env-var default. -- `deliver/` — the output-egress seam with `Discard`/`Stdout` defaults. -- `identity/` — caller-identity seams (`AdminPolicy`, `MemberResolver`). -- `dispatchguard/`, `pendingattach/`, `run/progress.go` — run-safety primitives. +- `config/`, `deliver/`, `identity/` — host seams (config / output / identity), + each with a shipped default. +- `dispatchguard/`, `pendingattach/` — run-safety primitives. ## Design diff --git a/examples/minimal/main.go b/examples/minimal/main.go index fdc8976..f184392 100644 --- a/examples/minimal/main.go +++ b/examples/minimal/main.go @@ -1,27 +1,49 @@ -// Command minimal demonstrates executus's standalone core primitives available -// today (P0): the config seam + bounded fan-out. The full zero-config "agentic -// in ~12 lines" example arrives once the model, tool, and run packages land -// (P1–P3). +// Command minimal is executus's "hello, agentic world": wire a model resolver, +// a tool registry, and the run executor, then run an agent. With no batteries +// (Audit/Budget/Critic/Checkpointer/Palette/Delivery all nil) this is a +// bounded, in-memory run — the light-host shape (gadfly's case). +// +// Run it with a provider key for the configured tier, e.g. +// +// ANTHROPIC_API_KEY=sk-... go run ./examples/minimal +// +// Override a tier from the environment without touching code, e.g. +// +// EXECUTUS_MODEL_TIER_FAST=openai/gpt-4o-mini ANTHROPIC_API_KEY= OPENAI_KEY=sk-... go run ./examples/minimal package main import ( "context" "fmt" + "log" "gitea.stevedudenhoeffer.com/steve/executus/config" - "gitea.stevedudenhoeffer.com/steve/executus/fanout" + "gitea.stevedudenhoeffer.com/steve/executus/model" + "gitea.stevedudenhoeffer.com/steve/executus/run" + "gitea.stevedudenhoeffer.com/steve/executus/tool" ) func main() { - cfg := config.Env("EXECUTUS_") // e.g. EXECUTUS_FANOUT_MAX_CONCURRENT=8 - max := cfg.Int("fanout.max_concurrent", 4) + // 1. Configure model tiers: live values come from the environment + // (EXECUTUS_MODEL_TIER_), falling back to these defaults. + model.Configure(config.Env("EXECUTUS_"), map[string]string{ + "fast": "anthropic/claude-haiku-4-5", + "thinking": "anthropic/claude-opus-4-8", + }, 0) - items := []string{"alpha", "beta", "gamma", "delta"} - results := fanout.Run(context.Background(), items, - fanout.Options[string]{MaxConcurrent: max}, - func(_ context.Context, s string) (int, error) { return len(s), nil }) + // 2. Build the executor: a tool registry + the model resolver. No batteries. + ex := run.New(run.Config{ + Registry: tool.NewRegistry(), + Models: model.ParseModelForContext, + }) - for _, r := range results { - fmt.Printf("%-6s -> %d (err=%v)\n", items[r.Index], r.Value, r.Err) + // 3. Run an agent and print its answer. + res := ex.Run(context.Background(), + run.RunnableAgent{Name: "assistant", SystemPrompt: "You are concise.", ModelTier: "fast"}, + tool.Invocation{RunID: "demo-1", CallerID: "local"}, + "In one sentence, what is an agent harness?") + if res.Err != nil { + log.Fatalf("run failed: %v", res.Err) } + fmt.Println(res.Output) } diff --git a/run/executor.go b/run/executor.go new file mode 100644 index 0000000..c1470fb --- /dev/null +++ b/run/executor.go @@ -0,0 +1,274 @@ +package run + +import ( + "context" + "fmt" + "time" + + "gitea.stevedudenhoeffer.com/steve/majordomo/agent" + "gitea.stevedudenhoeffer.com/steve/majordomo/llm" + + "gitea.stevedudenhoeffer.com/steve/executus/compact" + "gitea.stevedudenhoeffer.com/steve/executus/tool" +) + +// ModelResolver resolves a tier alias or concrete spec to a usable llm.Model +// and an enriched context (for usage attribution). model.ParseModelForContext +// satisfies it. +type ModelResolver func(ctx context.Context, tier string) (context.Context, llm.Model, error) + +// Defaults are the executor's fallback caps and loop guards, applied per run +// when the RunnableAgent leaves a field zero. +type Defaults struct { + MaxIterations int // tool-dispatch steps; default 12 + MaxRuntime time.Duration // wall-clock per run; default 60s + FallbackTier string // tier when the agent's is empty; default "fast" + MaxConsecutiveToolErrors int // loop guard; default 3 + MaxSameToolCallRepeats int // retry-storm guard; default 3 + CompactionThresholdRatio float64 // fraction of model context to compact at; default 0.7 +} + +func (d Defaults) withFallbacks() Defaults { + if d.MaxIterations <= 0 { + d.MaxIterations = 12 + } + if d.MaxRuntime <= 0 { + d.MaxRuntime = 60 * time.Second + } + if d.FallbackTier == "" { + d.FallbackTier = "fast" + } + if d.MaxConsecutiveToolErrors <= 0 { + d.MaxConsecutiveToolErrors = 3 + } + if d.MaxSameToolCallRepeats <= 0 { + d.MaxSameToolCallRepeats = 3 + } + if d.CompactionThresholdRatio <= 0 { + d.CompactionThresholdRatio = 0.7 + } + return d +} + +// Config wires an Executor. Registry + Models are required; everything else is +// optional and nil-safe — the zero Config beyond those yields a bounded, +// in-memory run with no persistence/audit/budget/critic/delegation/compaction +// (gadfly's case). +type Config struct { + Registry tool.Registry + Models ModelResolver + Defaults Defaults + Ports Ports + + // Compactor mints the per-run context-compaction hook. nil disables + // compaction. ContextTokens resolves a tier's model context-window (for + // the compaction threshold); nil — or a zero return — also disables it. + Compactor compact.CompactorFactory + ContextTokens func(tier string) int + + // SystemHeader is an optional platform header prepended to every agent's + // system prompt. + SystemHeader string +} + +// Executor runs a RunnableAgent through majordomo's agent loop with the wired +// Ports. Construct with New; safe for concurrent use across runs. +type Executor struct { + cfg Config +} + +// New builds an Executor. It panics if Registry or Models is nil — those are +// structural, not runtime, errors. +func New(cfg Config) *Executor { + if cfg.Registry == nil || cfg.Models == nil { + panic("run.New: Registry and Models are required") + } + cfg.Defaults = cfg.Defaults.withFallbacks() + return &Executor{cfg: cfg} +} + +// Result is one run's outcome. Err carries the run failure (if any); the other +// fields are populated best-effort even on error (partial output/steps/usage). +type Result struct { + RunID string + Output string + Steps []tool.Step + Usage llm.Usage + Err error +} + +// Run executes ra with the given invocation + input and returns the Result. It +// never propagates a panic; failures surface in Result.Err. +func (e *Executor) Run(ctx context.Context, ra RunnableAgent, inv tool.Invocation, input string) Result { + started := time.Now() + res := Result{RunID: inv.RunID} + + tier := ra.ModelTier + if tier == "" { + tier = e.cfg.Defaults.FallbackTier + } + maxIter := ra.MaxIterations + if maxIter <= 0 { + maxIter = e.cfg.Defaults.MaxIterations + } + maxRuntime := ra.MaxRuntime + if maxRuntime <= 0 { + maxRuntime = e.cfg.Defaults.MaxRuntime + } + + // Budget gate (pre-run): a rejected run makes no model call. + if e.cfg.Ports.Budget != nil { + if err := e.cfg.Ports.Budget.Check(ctx, inv.CallerID); err != nil { + res.Err = err + return res + } + } + + // Resolve the model (enriches ctx for usage attribution). + modelCtx, model, err := e.cfg.Models(ctx, tier) + if err != nil { + res.Err = fmt.Errorf("resolve model %q: %w", tier, err) + return res + } + ctx = modelCtx + + // Audit start (optional). The recorder satisfies RunTally; stamp it on the + // invocation so a self-status tool can read live progress. + var rec RunRecorder + if e.cfg.Ports.Audit != nil { + rec = e.cfg.Ports.Audit.StartRun(ctx, RunInfo{ + RunID: inv.RunID, + SubjectID: ra.ID, + Name: ra.Name, + CallerID: inv.CallerID, + ChannelID: inv.ChannelID, + ParentRunID: inv.ParentRunID, + Inputs: inv.SkillInputs, + StartedAt: started, + }) + } + if rec != nil { + inv.RunState = NewRunStateAccessor(rec, maxIter, 0, started) + } + + // Build the toolbox from the agent's low-level tools. + toolbox, err := e.cfg.Registry.Build(ra.LowLevelTools, inv, tool.Visibility("private"), nil) + if err != nil { + res.Err = fmt.Errorf("build toolbox: %w", err) + e.finishAudit(ctx, rec, "error", res, started, res.Err) + return res + } + + // Step instrumentation: accumulate Result.Steps + fire inv.OnStep, and feed + // the audit recorder. majordomo's step observer hands us each completed + // iteration; we zip the model's tool calls with their executed results. + emitter := newStepEmitter(inv.OnStep) + stepObserver := func(s agent.Step) { + if rec != nil { + rec.OnStep(s.Index, s.Response) + } + var calls []llm.ToolCall + if s.Response != nil { + calls = s.Response.ToolCalls + } + for i, r := range s.Results { + var call llm.ToolCall + if i < len(calls) { + call = calls[i] + } + emitter.toolStart(ctx, call.Name, call.Arguments) + emitter.toolEnd(ctx, call, r.Content, r.IsError) + if rec != nil { + rec.OnTool(call, r.Content) + } + } + } + + // Run context: bound by MaxRuntime, detached from the caller's deadline so a + // lane/queue wait doesn't eat the run budget (mort's V10 lesson). Caller + // cancellation still propagates via MergeCancellation. + runCtx, cancel := context.WithTimeout(context.WithoutCancel(ctx), maxRuntime) + defer cancel() + runCtx, mergeCancel := MergeCancellation(runCtx, ctx) + defer mergeCancel() + + opts := []agent.Option{ + agent.WithToolbox(toolbox), + agent.WithMaxSteps(maxIter), + agent.WithToolErrorLimits(e.cfg.Defaults.MaxConsecutiveToolErrors, e.cfg.Defaults.MaxSameToolCallRepeats), + agent.WithStepObserver(stepObserver), + } + if e.cfg.Compactor != nil && e.cfg.ContextTokens != nil { + if threshold := e.compactionThreshold(tier); threshold > 0 { + opts = append(opts, agent.WithCompactor(e.cfg.Compactor(threshold, nil))) + } + } + + ag := agent.New(model, e.systemPrompt(ra), opts...) + runRes, runErr := ag.Run(runCtx, input) + + status := "ok" + if runErr != nil { + status = "error" + } + if runRes != nil { + res.Output = runRes.Output + res.Usage = runRes.Usage + } + res.Steps = emitter.snapshot() + res.Err = runErr + + e.finishAudit(ctx, rec, status, res, started, runErr) + if e.cfg.Ports.Budget != nil { + e.cfg.Ports.Budget.Commit(detach(ctx), inv.CallerID, time.Since(started).Seconds()) + } + return res +} + +// finishAudit writes the terminal roll-up on a detached context so a cancelled +// run still records (mort's CleanupContextTimeout lesson). +func (e *Executor) finishAudit(ctx context.Context, rec RunRecorder, status string, res Result, started time.Time, runErr error) { + if rec == nil { + return + } + stats := RunStats{ + Status: status, + Output: res.Output, + ToolCalls: rec.ToolCallsCount(), + RuntimeSeconds: time.Since(started).Seconds(), + } + if runErr != nil { + stats.Error = runErr.Error() + } + stats.InputTokens, stats.OutputTokens, stats.ThinkingTokens = rec.TokenStats() + rec.Close(detach(ctx), stats) +} + +func (e *Executor) systemPrompt(ra RunnableAgent) string { + if e.cfg.SystemHeader == "" { + return ra.SystemPrompt + } + if ra.SystemPrompt == "" { + return e.cfg.SystemHeader + } + return e.cfg.SystemHeader + "\n\n" + ra.SystemPrompt +} + +// compactionThreshold returns the token threshold for the tier's model context +// window (ratio × limit), or 0 when the limit is unknown. +func (e *Executor) compactionThreshold(tier string) int { + max := e.cfg.ContextTokens(tier) + if max <= 0 { + return 0 + } + return int(float64(max) * e.cfg.Defaults.CompactionThresholdRatio) +} + +// detach derives a bounded cleanup context off ctx, detached from its +// cancellation, for post-run writes. The cancel is intentionally not returned; +// CleanupContextTimeout bounds the lifetime. +func detach(ctx context.Context) context.Context { + c, cancel := context.WithTimeout(context.WithoutCancel(ctx), CleanupContextTimeout) + _ = cancel // bounded by the timeout; nothing to cancel early + return c +} diff --git a/run/executor_test.go b/run/executor_test.go new file mode 100644 index 0000000..dc4101e --- /dev/null +++ b/run/executor_test.go @@ -0,0 +1,132 @@ +package run + +import ( + "context" + "errors" + "testing" + + "gitea.stevedudenhoeffer.com/steve/majordomo/llm" + "gitea.stevedudenhoeffer.com/steve/majordomo/provider/fake" + + "gitea.stevedudenhoeffer.com/steve/executus/tool" +) + +// fakeModels returns a ModelResolver backed by a fake provider scripted to +// reply with the given text (no tool calls — the loop terminates immediately). +func fakeModels(t *testing.T, reply string) ModelResolver { + t.Helper() + fp := fake.New("fake") + fp.Enqueue("test-model", fake.Reply(reply)) + m, err := fp.Model("test-model") + if err != nil { + t.Fatalf("fake model: %v", err) + } + return func(ctx context.Context, _ string) (context.Context, llm.Model, error) { + return ctx, m, nil + } +} + +// TestExecutorRunHelloWorld is the milestone: executus runs an agent end-to-end +// against the fake provider and returns its output. Proves the kernel is +// runnable with the zero Ports (no persistence/audit/budget/critic). +func TestExecutorRunHelloWorld(t *testing.T) { + ex := New(Config{ + Registry: tool.NewRegistry(), + Models: fakeModels(t, "hello from executus"), + }) + + res := ex.Run(context.Background(), + RunnableAgent{Name: "greeter", SystemPrompt: "be brief", ModelTier: "test-model"}, + tool.Invocation{RunID: "run-1", CallerID: "caller-1"}, + "say hi") + + if res.Err != nil { + t.Fatalf("run error: %v", res.Err) + } + if res.Output != "hello from executus" { + t.Fatalf("output = %q, want %q", res.Output, "hello from executus") + } + if res.RunID != "run-1" { + t.Errorf("RunID = %q, want run-1", res.RunID) + } +} + +// TestExecutorBudgetRejection: a Budget that denies makes no model call. +func TestExecutorBudgetRejection(t *testing.T) { + denied := errors.New("over budget") + var modelCalled bool + models := func(ctx context.Context, _ string) (context.Context, llm.Model, error) { + modelCalled = true + return ctx, nil, nil + } + ex := New(Config{ + Registry: tool.NewRegistry(), + Models: models, + Ports: Ports{Budget: budgetFunc{check: func(string) error { return denied }}}, + }) + + res := ex.Run(context.Background(), + RunnableAgent{ModelTier: "test-model"}, + tool.Invocation{RunID: "r", CallerID: "broke"}, "hi") + + if !errors.Is(res.Err, denied) { + t.Fatalf("err = %v, want budget denial", res.Err) + } + if modelCalled { + t.Error("model must not be resolved/called when budget denies") + } +} + +// TestExecutorAuditWiring: the Audit port receives StartRun + Close with the +// terminal status/output. +func TestExecutorAuditWiring(t *testing.T) { + rec := &captureRecorder{} + ex := New(Config{ + Registry: tool.NewRegistry(), + Models: fakeModels(t, "done"), + Ports: Ports{Audit: auditFunc{start: func(RunInfo) RunRecorder { return rec }}}, + }) + + res := ex.Run(context.Background(), + RunnableAgent{ModelTier: "test-model"}, + tool.Invocation{RunID: "r2", CallerID: "c"}, "go") + + if res.Err != nil { + t.Fatalf("run error: %v", res.Err) + } + if !rec.closed { + t.Fatal("recorder.Close was not called") + } + if rec.stats.Status != "ok" { + t.Errorf("close status = %q, want ok", rec.stats.Status) + } + if rec.stats.Output != "done" { + t.Errorf("close output = %q, want done", rec.stats.Output) + } +} + +// --- test doubles --- + +type budgetFunc struct{ check func(callerID string) error } + +func (b budgetFunc) Check(_ context.Context, callerID string) error { return b.check(callerID) } +func (b budgetFunc) Commit(context.Context, string, float64) {} + +type auditFunc struct{ start func(RunInfo) RunRecorder } + +func (a auditFunc) StartRun(_ context.Context, info RunInfo) RunRecorder { return a.start(info) } + +type captureRecorder struct { + closed bool + stats RunStats + steps int + tools int +} + +func (r *captureRecorder) TokenStats() (in, out, thinking int64) { return 0, 0, 0 } +func (r *captureRecorder) ToolCallsCount() int { return r.tools } +func (r *captureRecorder) OnStep(int, *llm.Response) { r.steps++ } +func (r *captureRecorder) OnTool(llm.ToolCall, string) { r.tools++ } +func (r *captureRecorder) LogEvent(string, map[string]any) {} +func (r *captureRecorder) LogError(string) {} +func (r *captureRecorder) Close(_ context.Context, s RunStats) { r.closed = true; r.stats = s }