diff --git a/.gitea/workflows/adversarial-review.yml b/.gitea/workflows/adversarial-review.yml index ec1c21a..9dd178e 100644 --- a/.gitea/workflows/adversarial-review.yml +++ b/.gitea/workflows/adversarial-review.yml @@ -1,10 +1,11 @@ # Gadfly — agentic adversarial PR reviewer (https://gitea.stevedudenhoeffer.com/steve/gadfly). # -# Runs the published Gadfly image (pinned to :v1) as a specialist swarm and posts +# Runs the published Gadfly image (pinned to an immutable :sha- tag — act_runner +# caches :latest, and this build is what carries foreman provider-type support) +# as a specialist swarm and posts # ONE consolidated review comment as gitea-actions. Advisory only — never blocks a -# merge. This reviews executus PRs (same setup as mort: m1/m5 locals + 2 cloud, -# 3-lens suite). Gadfly is a simple system — treat its findings as advisory and -# double-check before acting. +# merge. This reviews executus PRs with 3 ollama-cloud models (3-lens suite). Gadfly +# is a simple system — findings are advisory; always double-check before acting. name: Adversarial Review (Gadfly) @@ -40,24 +41,21 @@ jobs: || github.actor == 'fizi' || github.actor == 'dazed')) runs-on: ubuntu-latest - # Full fleet (2 cloud + 2 local Macs, all running concurrently) reviewing - # every PR with the 3-lens suite — the slow local lanes dominate wall time. - timeout-minutes: 90 + # 3 cloud models, all concurrent, 3-lens suite. ~12 min typical. + timeout-minutes: 30 steps: - - uses: docker://gitea.stevedudenhoeffer.com/steve/gadfly:v1 + - uses: docker://gitea.stevedudenhoeffer.com/steve/gadfly:sha-6e3a83c env: GITEA_API: ${{ github.server_url }}/api/v1/repos/${{ github.repository }} GITEA_TOKEN: ${{ secrets.GITEA_TOKEN }} OLLAMA_CLOUD_API_KEY: ${{ secrets.OLLAMA_CLOUD_API_KEY }} - # Local Ollama boxes (each its own lane, cap 1). NOTE: both Macs must be - # awake/reachable for their reviews to run; if a box is offline, that - # model's comment shows an error and the others still post. - GADFLY_ENDPOINT_M1PRO: "ollama|http://192.168.0.175:11434" - GADFLY_ENDPOINT_M5MAX: "ollama|http://192.168.0.173:11434" - # 2 cloud (parallel) + M1 Pro + M5 Max — one consolidated comment each. - GADFLY_MODELS: "minimax-m3:cloud,deepseek-v4-flash:cloud,m1pro/qwen3:14b,m5max/qwen3.6:35b-mlx" - # cloud runs 2 at once; each Mac one at a time; all three lanes parallel. - GADFLY_PROVIDER_CONCURRENCY: "ollama-cloud=2,m1pro=1,m5max=1" + # executus uses CLOUD MODELS ONLY. The local Macs (m1/m5) were dropped: + # on a P2-review measurement they took 26–29 min (with lens timeouts) + # and contributed ZERO real findings — the two cloud models found every + # genuine bug in 6–12 min. Cloud-only is faster AND higher-signal. + # 3 cloud models, one consolidated comment each, all run in parallel. + GADFLY_MODELS: "minimax-m3:cloud,deepseek-v4-flash:cloud,glm-5.2:cloud" + GADFLY_PROVIDER_CONCURRENCY: "ollama-cloud=3" # Default => the 3-lens suite (security, correctness, error-handling). # Set the repo var GADFLY_SPECIALISTS to override (csv / "all" / "auto"). GADFLY_SPECIALISTS: ${{ vars.GADFLY_SPECIALISTS || 'security,correctness,error-handling' }} diff --git a/CLAUDE.md b/CLAUDE.md index dfade99..86a7be9 100644 --- a/CLAUDE.md +++ b/CLAUDE.md @@ -43,8 +43,13 @@ CORE (majordomo + stdlib): fanout/ programmatic N×M swarm [P0 ✓] deliver/ output egress seam (+ Discard/Stdout) [P0 ✓] identity/ caller identity seams [P0 ✓] - run/ progress bridge now; the executor kernel + [P0 partial] - nil-safe Ports + RunnableAgent later [P2] + run/ run.Executor is RUNNABLE: model-resolve + [P2 core ✓] + toolbox + majordomo loop + compaction + + run-bounding (V10 detached timeout) + step/ + audit observers + Budget gate; RunnableAgent + DTO + nil-safe run.Ports. Follow-ups: wire + Critic/Checkpointer/PaletteSource/Delivery, + Phases, and the no-tools direct path [P2] dispatchguard/ loop/depth/fan-out caps [P0 ✓] pendingattach/ attachment dedupe [P0 ✓] tool/ registry + 3-stage permissions + ssrf [P1 ✓] @@ -52,7 +57,7 @@ CORE (majordomo + stdlib): (convar->config.Source; UsageSink/TraceSink seams; GenerateWith[T] structured output — no separate structured/ pkg) llmmeta/ shared meta-LLM helper over model/ [P1 ✓] - compact/ context compactor (WithCompactor hook) [P2] + compact/ context compactor (WithCompactor hook) [P2 ✓] tools/{web,net,store,compose,meta,comms} generic tools [P3] BATTERIES (opt-in siblings, each nil-safe + a default): diff --git a/README.md b/README.md index 367d652..80b53d8 100644 --- a/README.md +++ b/README.md @@ -31,15 +31,23 @@ bot) — mort and gadfly are the first two consumers (heavy and light). See [mort]: https://gitea.stevedudenhoeffer.com/steve/mort -**Available today (P0):** +**Available today:** +- `run/` — **executus is runnable.** `run.Executor` ties model resolution, the + tool registry, majordomo's agent loop, context compaction, run-bounding, and + step/audit instrumentation into one `Run(ctx, RunnableAgent, inv) Result`, with + every host concern behind a nil-safe `run.Ports` (Audit/Budget/Critic/ + Checkpointer/PaletteSource/Delivery). See `examples/minimal`. +- `model/` — config-driven tier resolution + failover over majordomo, with + pluggable `UsageSink`/`TraceSink` and `GenerateWith[T]` structured output. +- `tool/` — the tool registry + 3-stage permission model + SSRF guard. +- `compact/` — the per-run context compactor. - `lane/` — bounded worker pool with fair-share queueing (run- and provider-concurrency). - `fanout/` — programmatic N×M swarm with bounded global + per-key concurrency. -- `config/` — the host config seam (`Source`) with an env-var default. -- `deliver/` — the output-egress seam with `Discard`/`Stdout` defaults. -- `identity/` — caller-identity seams (`AdminPolicy`, `MemberResolver`). -- `dispatchguard/`, `pendingattach/`, `run/progress.go` — run-safety primitives. +- `config/`, `deliver/`, `identity/` — host seams (config / output / identity), + each with a shipped default. +- `dispatchguard/`, `pendingattach/` — run-safety primitives. ## Design diff --git a/compact/compactor.go b/compact/compactor.go new file mode 100644 index 0000000..91e5c24 --- /dev/null +++ b/compact/compactor.go @@ -0,0 +1,369 @@ +// V15.2 context compactor (re-based on majordomo). +// +// Why: the agent loop accumulates tool results indefinitely. A +// research-heavy run with many web_search / read_page / http_get +// results easily crosses 200K tokens and trips the model's HTTP-400 +// "prompt too long" rejection mid-run (observed at 410K tokens on +// qwen3-coder:480b which has a 262K cap). majordomo's agent loop calls +// the compactor with the full message slice before every model call; +// the compactor returns a shorter slice that preserves the system +// prompt + recent messages, with the middle range replaced by a +// synthetic summary. +// +// Strategy (unchanged from the agentkit era): +// - Keep any leading system message verbatim. (Under majordomo the +// system prompt normally travels in Request.System, not in the +// message slice, so this is defensive.) +// - Keep the last KeepRecent messages verbatim. This ensures the +// agent has fresh tool state to act on; compacting too aggressively +// would strip the in-flight context it needs to make the next +// decision. +// - Compress the middle range via a single fast-tier LLM call that +// receives the middle messages as raw text and produces a paragraph +// summary (URLs visited, key findings, file_ids created, what the +// agent is trying to accomplish). +// - Replace the middle range with one synthetic user-role message +// containing the summary. (user-role chosen because tool-result-role +// would be ambiguous without a matching tool_call_id.) +// +// What moved in the majordomo conversion: +// - The token-threshold gate lives HERE now. agentkit estimated +// tokens and only invoked the compactor past a configured +// threshold; majordomo's hook fires before every model call, so +// the threshold check (estimateTokens vs the per-run threshold the +// executor computes from the model's context limit) is the +// compactor's first step. +// - The compactor is per-run STATEFUL: majordomo does not replace +// the loop's internal transcript with the compacted slice (the +// hook shapes only what is SENT), so without memory the middle +// would be re-summarised from scratch on every step past the +// threshold. The state remembers how far the transcript has been +// folded into the running summary and folds the previous summary +// into the next one instead of re-paying for it. +// +// Failure path: any error (LLM unavailable, malformed response, etc.) +// is returned to the agent loop, which treats compactor errors as +// non-fatal and sends the original slice — if the next model call hits +// the provider's limit, the existing HTTP-400 error path takes over. + +package compact + +import ( + "context" + "errors" + "fmt" + "strings" + "sync" + + "gitea.stevedudenhoeffer.com/steve/majordomo/llm" +) + +// ModelResolver resolves a tier/spec to a usable llm.Model (and an enriched +// context for usage attribution). model.ParseModelForContext satisfies it. +type ModelResolver func(ctx context.Context, tier string) (context.Context, llm.Model, error) + +// Compactor is the per-run compaction hook handed to the agent loop +// (matches the signature agent.WithCompactor expects). +type Compactor = func(ctx context.Context, msgs []llm.Message) ([]llm.Message, error) + +// CompactionEvent describes one fired compaction so the executor can +// log it to skill_run_logs ("compaction_fired"). +type CompactionEvent struct { + // MessagesBefore/After count the messages that would have been + // sent without/with this compaction. + MessagesBefore int + MessagesAfter int + // TokensBefore/After are the estimateTokens values for the same + // two slices. + TokensBefore int + TokensAfter int +} + +// CompactorFactory mints a fresh per-run Compactor bound to a token +// threshold. onFire (nil-safe) observes every compaction that actually +// fires. A non-positive threshold yields a pass-through compactor. +type CompactorFactory func(thresholdTokens int, onFire func(CompactionEvent)) Compactor + +// CompactorConfig controls the v15.2 compactor's behaviour. Construct +// in mort.go from convars + the executor's ModelResolver. +type CompactorConfig struct { + // Models resolves a model spec (typically "fast" or a specific tier) + // to an llm.Model. Required; nil disables compaction. + Models ModelResolver + + // SummarizerTier is the model tier used for the compression LLM call. + // Production default "fast"; an admin may set "haiku" or a specific + // model spec. Empty falls back to "fast". + SummarizerTier string + + // KeepRecent is the number of trailing messages preserved verbatim. + // Default 8. Lower values compact more aggressively (the next loop + // iteration sees less recent context); higher values keep more. + KeepRecent int + + // SummaryWordCap bounds the LLM-generated summary length. Default + // 200 words ≈ 800 chars ≈ 200 tokens — small enough that the + // compaction always shrinks the slice meaningfully. + SummaryWordCap int +} + +// NewCompactor returns a CompactorFactory implementing the middle-range +// summarisation strategy. nil cfg.Models returns a factory of no-op +// compactors that always return the input unchanged (degrades to +// v15.1 behaviour). +func NewCompactor(cfg CompactorConfig) CompactorFactory { + if cfg.Models == nil { + return func(int, func(CompactionEvent)) Compactor { + return func(_ context.Context, msgs []llm.Message) ([]llm.Message, error) { + return msgs, nil + } + } + } + if cfg.SummarizerTier == "" { + cfg.SummarizerTier = "fast" + } + if cfg.KeepRecent <= 0 { + cfg.KeepRecent = 8 + } + if cfg.SummaryWordCap <= 0 { + cfg.SummaryWordCap = 200 + } + return func(threshold int, onFire func(CompactionEvent)) Compactor { + st := &compactionState{} + return func(ctx context.Context, msgs []llm.Message) ([]llm.Message, error) { + return compactIfNeeded(ctx, cfg, st, threshold, onFire, msgs) + } + } +} + +// compactionState is the per-run fold memory: msgs[:prefixEnd] +// (excluding a leading system message) are represented by summaryText +// in the rendered slice. Guarded by mu for safety although the agent +// loop invokes the hook from a single goroutine. +type compactionState struct { + mu sync.Mutex + prefixEnd int + summaryText string +} + +// compactIfNeeded is the workhorse: render the transcript with any +// existing fold applied, check the threshold, and fold more of the +// middle into the summary when the rendered size still exceeds it. +func compactIfNeeded(ctx context.Context, cfg CompactorConfig, st *compactionState, + threshold int, onFire func(CompactionEvent), msgs []llm.Message) ([]llm.Message, error) { + st.mu.Lock() + defer st.mu.Unlock() + + rendered := renderCompacted(st, msgs) + if threshold <= 0 { + return rendered, nil + } + tokensBefore := estimateTokens(rendered) + if tokensBefore < threshold { + return rendered, nil + } + + // Determine the new middle range to fold: everything between what + // is already summarised (or the optional leading system message) + // and the KeepRecent tail. + startMiddle := st.prefixEnd + if startMiddle == 0 && len(msgs) > 0 && msgs[0].Role == llm.RoleSystem { + startMiddle = 1 + } + endMiddle := len(msgs) - cfg.KeepRecent + if endMiddle <= startMiddle { + // Nothing new to fold (the tail alone exceeds the threshold). + // Return the rendered slice; the model call may still succeed. + return rendered, nil + } + middle := msgs[startMiddle:endMiddle] + + summary, err := summariseMiddle(ctx, cfg, st.summaryText, middle) + if err != nil { + // Non-fatal upstream: the agent loop sends the ORIGINAL slice. Return + // msgs, not `rendered` — on a second+ compaction `rendered` already + // carries a prior synthetic summary, which is not the documented + // "original slice" the loop expects on a compactor error. + return msgs, fmt.Errorf("compactor: summarise middle: %w", err) + } + st.summaryText = summary + st.prefixEnd = endMiddle + + out := renderCompacted(st, msgs) + if onFire != nil { + onFire(CompactionEvent{ + MessagesBefore: len(rendered), + MessagesAfter: len(out), + TokensBefore: tokensBefore, + TokensAfter: estimateTokens(out), + }) + } + return out, nil +} + +// renderCompacted applies the fold state to msgs: [optional system] + +// [synthetic summary] + msgs[prefixEnd:]. With no fold yet, msgs is +// returned unchanged. +func renderCompacted(st *compactionState, msgs []llm.Message) []llm.Message { + if st.prefixEnd <= 0 || st.prefixEnd > len(msgs) { + return msgs + } + tail := msgs[st.prefixEnd:] + out := make([]llm.Message, 0, len(tail)+2) + if msgs[0].Role == llm.RoleSystem { + out = append(out, msgs[0]) + } + out = append(out, llm.UserText( + "[CONTEXT COMPACTED] The earlier portion of this conversation was summarised "+ + "to fit the model's context window. Summary:\n\n"+st.summaryText+ + "\n\nResume from the recent messages below.")) + out = append(out, tail...) + return out +} + +// summariseMiddle composes a "compress this transcript" prompt and +// fires one fast-tier LLM call. prevSummary (may be empty) is the +// running summary from earlier compactions; it is folded into the new +// summary so prior context is not lost. +func summariseMiddle(ctx context.Context, cfg CompactorConfig, prevSummary string, middle []llm.Message) (string, error) { + if len(middle) == 0 { + return "", errors.New("compactor: empty middle range") + } + modelCtx, model, err := cfg.Models(ctx, cfg.SummarizerTier) + if err != nil { + return "", fmt.Errorf("compactor: resolve summarizer model %q: %w", cfg.SummarizerTier, err) + } + if model == nil { + return "", errors.New("compactor: summarizer model resolved to nil") + } + if modelCtx != nil { + ctx = modelCtx + } + + var prior string + if strings.TrimSpace(prevSummary) != "" { + prior = "AN EARLIER PORTION WAS ALREADY SUMMARISED AS:\n" + prevSummary + + "\n\nFold that summary into your new one — its facts must survive.\n\n" + } + transcript := renderTranscript(middle) + prompt := fmt.Sprintf( + "You are compressing an in-flight agent's conversation transcript so the agent "+ + "can continue working without blowing its model context. The transcript below is "+ + "a sequence of tool calls and their results. Produce a single paragraph (under %d words) "+ + "that captures:\n"+ + " - WHAT the agent has been trying to accomplish.\n"+ + " - WHICH URLs were visited / fetched (list inline, comma-separated).\n"+ + " - KEY findings or decisions (factual results the agent will need later).\n"+ + " - ANY file_ids or KV keys the agent created — these are persistent state references the agent must keep.\n"+ + " - ANY errors or dead-ends that the agent should not re-try.\n"+ + "DO NOT include verbose HTTP headers, tool-call metadata, error stack traces, or repetitive content. "+ + "DO NOT add commentary or markdown headers. Output prose only.\n\n"+ + "%sTRANSCRIPT TO COMPRESS:\n%s", + cfg.SummaryWordCap, + prior, + transcript, + ) + + resp, err := model.Generate(ctx, llm.Request{Messages: []llm.Message{llm.UserText(prompt)}}) + if err != nil { + return "", fmt.Errorf("compactor: summarise LLM call: %w", err) + } + text := strings.TrimSpace(resp.Text()) + if text == "" { + return "", errors.New("compactor: summarizer returned empty text") + } + return text, nil +} + +// estimateTokens is the chars/4 heuristic over a message slice's text, +// tool calls, and tool results. Images count a flat ~1K tokens each. +// It intentionally matches the coarse estimator the old agentkit loop +// used — the 0.7 threshold ratio provides the safety margin. +func estimateTokens(msgs []llm.Message) int { + chars := 0 + for _, m := range msgs { + for _, p := range m.Parts { + switch v := p.(type) { + case llm.TextPart: + chars += len(v.Text) + case llm.ImagePart: + chars += 4096 + default: + // llm.Part is a sealed-but-extensible interface (future media + // kinds). Count an unknown part conservatively (like an image) + // rather than 0, so a transcript of unrecognised content can't + // silently slip under the compaction threshold and 400 the + // model. Bump this if a large new part kind lands. + _ = v + chars += 4096 + } + } + for _, tc := range m.ToolCalls { + chars += len(tc.Name) + len(tc.Arguments) + } + for _, tr := range m.ToolResults { + chars += len(tr.Content) + } + } + return chars / 4 +} + +// transcriptMessageCap bounds individual message bodies at ~2KB so a +// single ultra-long tool result can't dominate the prompt sent to the +// summarizer. +const transcriptMessageCap = 2048 + +// secretBearingTools name tools whose ARGUMENTS routinely carry credentials or +// message bodies (bearer tokens, API keys, recipients, request bodies). Their +// args are dropped before the transcript reaches the summarizer model — which +// may be a different provider/tier than the run model — mirroring the redaction +// run/steps.go applies to user-facing step summaries. http_* and webhook_* are +// matched by prefix below. +var secretBearingTools = map[string]bool{ + "mcp_call": true, + "email_send": true, +} + +// redactToolArgs returns a summariser-safe rendering of a tool call's args: +// "[redacted]" for known secret-bearing tools, the args verbatim otherwise. +func redactToolArgs(name, args string) string { + if secretBearingTools[name] || + strings.HasPrefix(name, "http_") || + strings.HasPrefix(name, "webhook_") { + return "[redacted]" + } + return args +} + +// renderTranscript flattens a message slice to a plain-text transcript +// suitable for the summarisation prompt. Tool calls show name + (redacted) args, +// tool results show name + body. Empty fields are skipped. +// +// NOTE: tool-RESULT bodies are forwarded to the summarizer (the summary needs +// the findings). A host whose tool results may contain secrets and whose +// summarizer tier resolves to an untrusted provider should ensure that tier is +// trusted, or pre-sanitise results before they reach the agent loop. +func renderTranscript(msgs []llm.Message) string { + var sb strings.Builder + for i, m := range msgs { + fmt.Fprintf(&sb, "---\n[%d] role=%s\n", i+1, m.Role) + if text := m.Text(); text != "" { + sb.WriteString(truncate(text, transcriptMessageCap)) + sb.WriteString("\n") + } + for _, tc := range m.ToolCalls { + fmt.Fprintf(&sb, "tool_call name=%s args=%s\n", tc.Name, truncate(redactToolArgs(tc.Name, string(tc.Arguments)), transcriptMessageCap)) + } + for _, tr := range m.ToolResults { + fmt.Fprintf(&sb, "tool_result name=%s body=%s\n", tr.Name, truncate(tr.Content, transcriptMessageCap)) + } + } + return sb.String() +} + +func truncate(s string, n int) string { + if len(s) <= n { + return s + } + return s[:n] + "...(truncated)" +} diff --git a/examples/minimal/main.go b/examples/minimal/main.go index fdc8976..f184392 100644 --- a/examples/minimal/main.go +++ b/examples/minimal/main.go @@ -1,27 +1,49 @@ -// Command minimal demonstrates executus's standalone core primitives available -// today (P0): the config seam + bounded fan-out. The full zero-config "agentic -// in ~12 lines" example arrives once the model, tool, and run packages land -// (P1–P3). +// Command minimal is executus's "hello, agentic world": wire a model resolver, +// a tool registry, and the run executor, then run an agent. With no batteries +// (Audit/Budget/Critic/Checkpointer/Palette/Delivery all nil) this is a +// bounded, in-memory run — the light-host shape (gadfly's case). +// +// Run it with a provider key for the configured tier, e.g. +// +// ANTHROPIC_API_KEY=sk-... go run ./examples/minimal +// +// Override a tier from the environment without touching code, e.g. +// +// EXECUTUS_MODEL_TIER_FAST=openai/gpt-4o-mini ANTHROPIC_API_KEY= OPENAI_KEY=sk-... go run ./examples/minimal package main import ( "context" "fmt" + "log" "gitea.stevedudenhoeffer.com/steve/executus/config" - "gitea.stevedudenhoeffer.com/steve/executus/fanout" + "gitea.stevedudenhoeffer.com/steve/executus/model" + "gitea.stevedudenhoeffer.com/steve/executus/run" + "gitea.stevedudenhoeffer.com/steve/executus/tool" ) func main() { - cfg := config.Env("EXECUTUS_") // e.g. EXECUTUS_FANOUT_MAX_CONCURRENT=8 - max := cfg.Int("fanout.max_concurrent", 4) + // 1. Configure model tiers: live values come from the environment + // (EXECUTUS_MODEL_TIER_), falling back to these defaults. + model.Configure(config.Env("EXECUTUS_"), map[string]string{ + "fast": "anthropic/claude-haiku-4-5", + "thinking": "anthropic/claude-opus-4-8", + }, 0) - items := []string{"alpha", "beta", "gamma", "delta"} - results := fanout.Run(context.Background(), items, - fanout.Options[string]{MaxConcurrent: max}, - func(_ context.Context, s string) (int, error) { return len(s), nil }) + // 2. Build the executor: a tool registry + the model resolver. No batteries. + ex := run.New(run.Config{ + Registry: tool.NewRegistry(), + Models: model.ParseModelForContext, + }) - for _, r := range results { - fmt.Printf("%-6s -> %d (err=%v)\n", items[r.Index], r.Value, r.Err) + // 3. Run an agent and print its answer. + res := ex.Run(context.Background(), + run.RunnableAgent{Name: "assistant", SystemPrompt: "You are concise.", ModelTier: "fast"}, + tool.Invocation{RunID: "demo-1", CallerID: "local"}, + "In one sentence, what is an agent harness?") + if res.Err != nil { + log.Fatalf("run failed: %v", res.Err) } + fmt.Println(res.Output) } diff --git a/llmmeta/helper.go b/llmmeta/helper.go index 3fabdd0..f5773e2 100644 --- a/llmmeta/helper.go +++ b/llmmeta/helper.go @@ -34,6 +34,7 @@ import ( "context" "encoding/json" "fmt" + "log/slog" "strings" "time" @@ -574,7 +575,9 @@ func (h *Helper) recordLedger(ctx context.Context, call MetaCall) { if h.storage == nil { return } - _ = h.storage.RecordMetaCall(ctx, call) + if err := h.storage.RecordMetaCall(ctx, call); err != nil { + slog.Warn("llmmeta: failed to record ledger row", "err", err) + } } // tryParseJSON attempts to decode text as JSON. Returns the parsed diff --git a/model/call.go b/model/call.go index 30bb964..7e07c07 100644 --- a/model/call.go +++ b/model/call.go @@ -237,7 +237,7 @@ func recordUsage(ctx context.Context, resp *llm.Response) { return } u := resp.Usage - if u.InputTokens == 0 && u.OutputTokens == 0 { + if u.InputTokens == 0 && u.OutputTokens == 0 && u.CacheReadTokens == 0 && u.CacheWriteTokens == 0 { return } model := resolvedModelName(ctx, resp) diff --git a/model/cloud_sync.go b/model/cloud_sync.go index ae28512..9abd00e 100644 --- a/model/cloud_sync.go +++ b/model/cloud_sync.go @@ -314,7 +314,7 @@ func (c *CloudOllamaLimitCache) fetchTags(ctx context.Context) ([]string, error) return nil, err } defer resp.Body.Close() - body, err := io.ReadAll(resp.Body) + body, err := readCapped(resp.Body) if err != nil { return nil, err } @@ -367,7 +367,7 @@ func (c *CloudOllamaLimitCache) fetchContextLength(ctx context.Context, modelNam return 0, err } defer resp.Body.Close() - respBody, err := io.ReadAll(resp.Body) + respBody, err := readCapped(resp.Body) if err != nil { return 0, err } @@ -451,3 +451,24 @@ func truncate(b []byte, n int) string { } return string(b[:n]) + "...(truncated)" } + +// maxLimitCacheResponseBytes bounds the ollama.com limit-cache HTTP responses +// (/api/tags, /api/show) so a misbehaving endpoint can't stream an unbounded +// body before the 15s timeout fires. 1 MiB is far above any real response. +const maxLimitCacheResponseBytes = 1 << 20 + +// readCapped reads up to maxLimitCacheResponseBytes from r and returns a clear +// error if the response EXCEEDS the cap — rather than silently truncating (as a +// bare io.LimitReader does) and letting downstream json.Unmarshal fail with an +// opaque "unexpected end of JSON input". It reads one extra byte to detect the +// overflow. +func readCapped(r io.Reader) ([]byte, error) { + body, err := io.ReadAll(io.LimitReader(r, maxLimitCacheResponseBytes+1)) + if err != nil { + return nil, err + } + if len(body) > maxLimitCacheResponseBytes { + return nil, fmt.Errorf("cloud_sync: response exceeded %d bytes", maxLimitCacheResponseBytes) + } + return body, nil +} diff --git a/model/sink.go b/model/sink.go index 47575e2..664fedb 100644 --- a/model/sink.go +++ b/model/sink.go @@ -16,6 +16,11 @@ import ( // UsageSink receives one record per successful Generate through a model parsed // by this package (ParseModelRequest / ParseModelForContext). Implement it to // meter or bill; the token detail mirrors majordomo's Response.Usage. +// +// IMPORTANT: cacheReadTokens and cacheWriteTokens are PORTIONS of inputTokens, +// not independent additive values (they let a sink price cached vs fresh input +// differently). A sink must NOT compute total = input+output+cacheRead+ +// cacheWrite — that double-counts the cached input. type UsageSink interface { Record(ctx context.Context, model string, inputTokens, outputTokens, cacheReadTokens, cacheWriteTokens int) } diff --git a/run/agent.go b/run/agent.go new file mode 100644 index 0000000..18a0a98 --- /dev/null +++ b/run/agent.go @@ -0,0 +1,76 @@ +package run + +import "time" + +// RunnableAgent is the kernel's view of "a thing to run": an identity, a model +// tier, a system prompt, execution caps, and a tool palette. It is a plain DTO +// on purpose — the run kernel never imports a noun battery. The persona Agent +// and the saved Skill each LOWER themselves into a RunnableAgent (a ToRunnable +// method on the battery side), and the kernel runs the DTO. This is the +// inversion of mort's agentexec.Executor.Run(*agents.Agent): the executor no +// longer depends on the persona struct, only on this shape. +// +// A light host can build a RunnableAgent inline (model tier + prompt + a few +// tool names) for a one-shot bounded run, with no persona or skill battery at +// all — that is exactly gadfly's swarm task. +type RunnableAgent struct { + // ID is a stable identifier for the run subject (an agent/skill UUID, or + // any host-chosen id). Used for audit attribution and dispatch-guard + // genealogy. Empty is allowed for anonymous one-shot runs. + ID string + + // Name is a human label (audit/logs/delivery). Empty is allowed. + Name string + + // SystemPrompt is the agent's base system prompt (before per-run + // personalization, which a host layers via Ports). + SystemPrompt string + + // ModelTier is a tier alias or concrete spec resolved through + // model.ParseModelForContext. Empty resolves to the host's default tier. + ModelTier string + + // MaxIterations caps the agent loop's tool-dispatch steps. 0 = kernel + // default. MaxRuntime caps wall-clock for the whole run (the kernel starts + // this clock AFTER any lane dequeue, not at submission). 0 = kernel + // default. + MaxIterations int + MaxRuntime time.Duration + + // LowLevelTools are tool-registry names the run may call directly. + // SkillPalette / SubAgentPalette name saved skills / sub-agents exposed as + // skill__ / agent__ delegation tools, resolved through + // Ports.Palette (nil Palette => those entries are inert). + LowLevelTools []string + SkillPalette []string + SubAgentPalette []string + + // Phases optionally model a multi-step pipeline (each phase its own prompt + // + tier + tools). An empty slice is a single-phase run — the common case. + Phases []Phase + + // Critic configures the optional two-tier run-critic (Ports.Critic). The + // zero value (disabled) is the light-host default. + Critic CriticConfig +} + +// Phase is one step of a multi-step run: its own system prompt, model tier, +// iteration cap, and tool subset. Optional phases may be skipped by the +// pipeline when their precondition isn't met. +type Phase struct { + Name string + SystemPrompt string + ModelTier string + MaxIterations int + Tools []string + Optional bool +} + +// CriticConfig configures the optional run-critic. Enabled gates whether a +// critic monitor is started at all; BackstopMultiplier sets the hard-kill +// deadline as a multiple of the soft trigger (MaxRuntime). A non-positive +// multiplier uses the kernel default. +type CriticConfig struct { + Enabled bool + BackstopMultiplier float64 +} diff --git a/run/executor.go b/run/executor.go new file mode 100644 index 0000000..53d6940 --- /dev/null +++ b/run/executor.go @@ -0,0 +1,318 @@ +package run + +import ( + "context" + "errors" + "fmt" + "time" + + "gitea.stevedudenhoeffer.com/steve/majordomo/agent" + "gitea.stevedudenhoeffer.com/steve/majordomo/llm" + + "gitea.stevedudenhoeffer.com/steve/executus/compact" + "gitea.stevedudenhoeffer.com/steve/executus/tool" +) + +// ModelResolver resolves a tier alias or concrete spec to a usable llm.Model +// and an enriched context (for usage attribution). model.ParseModelForContext +// satisfies it. +type ModelResolver func(ctx context.Context, tier string) (context.Context, llm.Model, error) + +// Defaults are the executor's fallback caps and loop guards, applied per run +// when the RunnableAgent leaves a field zero. +type Defaults struct { + MaxIterations int // tool-dispatch steps; default 12 + MaxRuntime time.Duration // wall-clock per run; default 60s + FallbackTier string // tier when the agent's is empty; default "fast" + MaxConsecutiveToolErrors int // loop guard; default 3 + MaxSameToolCallRepeats int // retry-storm guard; default 3 + CompactionThresholdRatio float64 // fraction of model context to compact at; default 0.7 +} + +func (d Defaults) withFallbacks() Defaults { + if d.MaxIterations <= 0 { + d.MaxIterations = 12 + } + if d.MaxRuntime <= 0 { + d.MaxRuntime = 60 * time.Second + } + if d.FallbackTier == "" { + d.FallbackTier = "fast" + } + if d.MaxConsecutiveToolErrors <= 0 { + d.MaxConsecutiveToolErrors = 3 + } + if d.MaxSameToolCallRepeats <= 0 { + d.MaxSameToolCallRepeats = 3 + } + if d.CompactionThresholdRatio <= 0 { + d.CompactionThresholdRatio = 0.7 + } + return d +} + +// Config wires an Executor. Registry + Models are required; everything else is +// optional and nil-safe — the zero Config beyond those yields a bounded, +// in-memory run with no persistence/audit/budget/critic/delegation/compaction +// (gadfly's case). +type Config struct { + Registry tool.Registry + Models ModelResolver + Defaults Defaults + Ports Ports + + // Compactor mints the per-run context-compaction hook. nil disables + // compaction. ContextTokens resolves a tier's model context-window (for + // the compaction threshold); nil — or a zero return — also disables it. + Compactor compact.CompactorFactory + ContextTokens func(tier string) int + + // SystemHeader is an optional platform header prepended to every agent's + // system prompt. + SystemHeader string +} + +// Executor runs a RunnableAgent through majordomo's agent loop with the wired +// Ports. Construct with New; safe for concurrent use across runs. +type Executor struct { + cfg Config +} + +// New builds an Executor. It panics if Registry or Models is nil — those are +// structural, not runtime, errors. +func New(cfg Config) *Executor { + if cfg.Registry == nil || cfg.Models == nil { + panic("run.New: Registry and Models are required") + } + cfg.Defaults = cfg.Defaults.withFallbacks() + return &Executor{cfg: cfg} +} + +// Result is one run's outcome. Err carries the run failure (if any); the other +// fields are populated best-effort even on error (partial output/steps/usage). +type Result struct { + RunID string + Output string + Steps []tool.Step + Usage llm.Usage + Err error +} + +// Run executes ra with the given invocation + input and returns the Result. It +// never propagates a panic; failures surface in Result.Err. +func (e *Executor) Run(ctx context.Context, ra RunnableAgent, inv tool.Invocation, input string) Result { + started := time.Now() + res := Result{RunID: inv.RunID} + + tier := ra.ModelTier + if tier == "" { + tier = e.cfg.Defaults.FallbackTier + } + maxIter := ra.MaxIterations + if maxIter <= 0 { + maxIter = e.cfg.Defaults.MaxIterations + } + maxRuntime := ra.MaxRuntime + if maxRuntime <= 0 { + maxRuntime = e.cfg.Defaults.MaxRuntime + } + + // Budget gate (pre-run): a rejected run makes no model call. + if e.cfg.Ports.Budget != nil { + if err := e.cfg.Ports.Budget.Check(ctx, inv.CallerID); err != nil { + res.Err = err + return res + } + } + + // Resolve the model (enriches ctx for usage attribution). + modelCtx, model, err := e.cfg.Models(ctx, tier) + if err != nil { + res.Err = fmt.Errorf("resolve model %q: %w", tier, err) + return res + } + if model == nil { + // A resolver returning (ctx, nil, nil) would otherwise nil-panic inside + // the agent loop; surface it as a clean error (Run never panics out). + res.Err = fmt.Errorf("resolve model %q: resolver returned a nil model", tier) + return res + } + ctx = modelCtx + + // Audit start (optional). The recorder satisfies RunTally; stamp it on the + // invocation so a self-status tool can read live progress. + var rec RunRecorder + var stateAcc *RunStateAccessor + if e.cfg.Ports.Audit != nil { + rec = e.cfg.Ports.Audit.StartRun(ctx, RunInfo{ + RunID: inv.RunID, + SubjectID: ra.ID, + Name: ra.Name, + CallerID: inv.CallerID, + ChannelID: inv.ChannelID, + ParentRunID: inv.ParentRunID, + Inputs: inv.SkillInputs, + StartedAt: started, + }) + } + if rec != nil { + stateAcc = NewRunStateAccessor(rec, maxIter, 0, started) + inv.RunState = stateAcc + } + + // Build the toolbox from the agent's low-level tools. + toolbox, err := e.cfg.Registry.Build(ra.LowLevelTools, inv, tool.Visibility("private"), nil) + if err != nil { + res.Err = fmt.Errorf("build toolbox: %w", err) + e.finishAudit(ctx, rec, "error", res, started, res.Err) + return res + } + + // Run context: bound by MaxRuntime, detached from the caller's deadline so a + // lane/queue wait doesn't eat the run budget (mort's V10 lesson). Caller + // cancellation still propagates via MergeCancellation. Created BEFORE the + // step observer so the observer forwards the merged run context (not a + // possibly-cancelled caller ctx) to OnStep consumers. + runCtx, cancel := context.WithTimeout(context.WithoutCancel(ctx), maxRuntime) + defer cancel() + runCtx, mergeCancel := MergeCancellation(runCtx, ctx) + defer mergeCancel() + + // Step instrumentation: accumulate Result.Steps + fire inv.OnStep, feed the + // audit recorder, and keep the live iteration counter fresh. majordomo's + // step observer hands us each completed iteration; we zip the model's tool + // calls with their executed results PAIRWISE — a result without a matching + // call (or a call without a result) is skipped rather than recorded as an + // empty-name "ghost" step. + emitter := newStepEmitter(inv.OnStep) + stepObserver := func(s agent.Step) { + if stateAcc != nil { + stateAcc.SetIteration(s.Index) + } + if rec != nil { + rec.OnStep(s.Index, s.Response) + } + var calls []llm.ToolCall + if s.Response != nil { + calls = s.Response.ToolCalls + } + n := len(s.Results) + if len(calls) < n { + n = len(calls) + } + for i := 0; i < n; i++ { + call, r := calls[i], s.Results[i] + emitter.toolStart(runCtx, call.Name, call.Arguments) + emitter.toolEnd(runCtx, call, r.Content, r.IsError) + if rec != nil { + rec.OnTool(call, r.Content) + } + } + } + + opts := []agent.Option{ + agent.WithToolbox(toolbox), + agent.WithMaxSteps(maxIter), + agent.WithToolErrorLimits(e.cfg.Defaults.MaxConsecutiveToolErrors, e.cfg.Defaults.MaxSameToolCallRepeats), + agent.WithStepObserver(stepObserver), + } + if e.cfg.Compactor != nil && e.cfg.ContextTokens != nil { + if threshold := e.compactionThreshold(tier); threshold > 0 { + // Forward compaction events to the audit log (makes the + // CompactionEvent doc's "logged to the run trace" promise true). + var onFire func(compact.CompactionEvent) + if rec != nil { + onFire = func(ev compact.CompactionEvent) { + rec.LogEvent("compaction_fired", map[string]any{ + "messages_before": ev.MessagesBefore, + "messages_after": ev.MessagesAfter, + "tokens_before": ev.TokensBefore, + "tokens_after": ev.TokensAfter, + }) + } + } + opts = append(opts, agent.WithCompactor(e.cfg.Compactor(threshold, onFire))) + } + } + + ag := agent.New(model, e.systemPrompt(ra), opts...) + runRes, runErr := ag.Run(runCtx, input) + + status := statusFor(runErr) + if runRes != nil { + res.Output = runRes.Output + res.Usage = runRes.Usage + } + res.Steps = emitter.snapshot() + res.Err = runErr + + e.finishAudit(ctx, rec, status, res, started, runErr) + if e.cfg.Ports.Budget != nil { + e.cfg.Ports.Budget.Commit(detach(ctx), inv.CallerID, time.Since(started).Seconds()) + } + return res +} + +// statusFor maps a run error to a RunStats.Status, distinguishing a deadline +// (timeout) and a cancellation (cancelled — caller cancel or shutdown) from a +// generic error so audit consumers can tell them apart. +func statusFor(runErr error) string { + switch { + case runErr == nil: + return "ok" + case errors.Is(runErr, context.DeadlineExceeded): + return "timeout" + case errors.Is(runErr, context.Canceled): + return "cancelled" + default: + return "error" + } +} + +// finishAudit writes the terminal roll-up on a detached context so a cancelled +// run still records (mort's CleanupContextTimeout lesson). +func (e *Executor) finishAudit(ctx context.Context, rec RunRecorder, status string, res Result, started time.Time, runErr error) { + if rec == nil { + return + } + stats := RunStats{ + Status: status, + Output: res.Output, + ToolCalls: rec.ToolCallsCount(), + RuntimeSeconds: time.Since(started).Seconds(), + } + if runErr != nil { + stats.Error = runErr.Error() + } + stats.InputTokens, stats.OutputTokens, stats.ThinkingTokens = rec.TokenStats() + rec.Close(detach(ctx), stats) +} + +func (e *Executor) systemPrompt(ra RunnableAgent) string { + if e.cfg.SystemHeader == "" { + return ra.SystemPrompt + } + if ra.SystemPrompt == "" { + return e.cfg.SystemHeader + } + return e.cfg.SystemHeader + "\n\n" + ra.SystemPrompt +} + +// compactionThreshold returns the token threshold for the tier's model context +// window (ratio × limit), or 0 when the limit is unknown. +func (e *Executor) compactionThreshold(tier string) int { + max := e.cfg.ContextTokens(tier) + if max <= 0 { + return 0 + } + return int(float64(max) * e.cfg.Defaults.CompactionThresholdRatio) +} + +// detach derives a bounded cleanup context off ctx, detached from its +// cancellation, for post-run writes. The cancel is intentionally not returned; +// CleanupContextTimeout bounds the lifetime. +func detach(ctx context.Context) context.Context { + c, cancel := context.WithTimeout(context.WithoutCancel(ctx), CleanupContextTimeout) + _ = cancel // bounded by the timeout; nothing to cancel early + return c +} diff --git a/run/executor_test.go b/run/executor_test.go new file mode 100644 index 0000000..250932c --- /dev/null +++ b/run/executor_test.go @@ -0,0 +1,168 @@ +package run + +import ( + "context" + "errors" + "fmt" + "testing" + + "gitea.stevedudenhoeffer.com/steve/majordomo/llm" + "gitea.stevedudenhoeffer.com/steve/majordomo/provider/fake" + + "gitea.stevedudenhoeffer.com/steve/executus/tool" +) + +// fakeModels returns a ModelResolver backed by a fake provider scripted to +// reply with the given text (no tool calls — the loop terminates immediately). +func fakeModels(t *testing.T, reply string) ModelResolver { + t.Helper() + fp := fake.New("fake") + fp.Enqueue("test-model", fake.Reply(reply)) + m, err := fp.Model("test-model") + if err != nil { + t.Fatalf("fake model: %v", err) + } + return func(ctx context.Context, _ string) (context.Context, llm.Model, error) { + return ctx, m, nil + } +} + +// TestExecutorRunHelloWorld is the milestone: executus runs an agent end-to-end +// against the fake provider and returns its output. Proves the kernel is +// runnable with the zero Ports (no persistence/audit/budget/critic). +func TestExecutorRunHelloWorld(t *testing.T) { + ex := New(Config{ + Registry: tool.NewRegistry(), + Models: fakeModels(t, "hello from executus"), + }) + + res := ex.Run(context.Background(), + RunnableAgent{Name: "greeter", SystemPrompt: "be brief", ModelTier: "test-model"}, + tool.Invocation{RunID: "run-1", CallerID: "caller-1"}, + "say hi") + + if res.Err != nil { + t.Fatalf("run error: %v", res.Err) + } + if res.Output != "hello from executus" { + t.Fatalf("output = %q, want %q", res.Output, "hello from executus") + } + if res.RunID != "run-1" { + t.Errorf("RunID = %q, want run-1", res.RunID) + } +} + +// TestExecutorBudgetRejection: a Budget that denies makes no model call. +func TestExecutorBudgetRejection(t *testing.T) { + denied := errors.New("over budget") + var modelCalled bool + models := func(ctx context.Context, _ string) (context.Context, llm.Model, error) { + modelCalled = true + return ctx, nil, nil + } + ex := New(Config{ + Registry: tool.NewRegistry(), + Models: models, + Ports: Ports{Budget: budgetFunc{check: func(string) error { return denied }}}, + }) + + res := ex.Run(context.Background(), + RunnableAgent{ModelTier: "test-model"}, + tool.Invocation{RunID: "r", CallerID: "broke"}, "hi") + + if !errors.Is(res.Err, denied) { + t.Fatalf("err = %v, want budget denial", res.Err) + } + if modelCalled { + t.Error("model must not be resolved/called when budget denies") + } +} + +// TestExecutorAuditWiring: the Audit port receives StartRun + Close with the +// terminal status/output. +func TestExecutorAuditWiring(t *testing.T) { + rec := &captureRecorder{} + ex := New(Config{ + Registry: tool.NewRegistry(), + Models: fakeModels(t, "done"), + Ports: Ports{Audit: auditFunc{start: func(RunInfo) RunRecorder { return rec }}}, + }) + + res := ex.Run(context.Background(), + RunnableAgent{ModelTier: "test-model"}, + tool.Invocation{RunID: "r2", CallerID: "c"}, "go") + + if res.Err != nil { + t.Fatalf("run error: %v", res.Err) + } + if !rec.closed { + t.Fatal("recorder.Close was not called") + } + if rec.stats.Status != "ok" { + t.Errorf("close status = %q, want ok", rec.stats.Status) + } + if rec.stats.Output != "done" { + t.Errorf("close output = %q, want done", rec.stats.Output) + } +} + +// --- test doubles --- + +type budgetFunc struct{ check func(callerID string) error } + +func (b budgetFunc) Check(_ context.Context, callerID string) error { return b.check(callerID) } +func (b budgetFunc) Commit(context.Context, string, float64) {} + +type auditFunc struct{ start func(RunInfo) RunRecorder } + +func (a auditFunc) StartRun(_ context.Context, info RunInfo) RunRecorder { return a.start(info) } + +type captureRecorder struct { + closed bool + stats RunStats + steps int + tools int +} + +func (r *captureRecorder) TokenStats() (in, out, thinking int64) { return 0, 0, 0 } +func (r *captureRecorder) ToolCallsCount() int { return r.tools } +func (r *captureRecorder) OnStep(int, *llm.Response) { r.steps++ } +func (r *captureRecorder) OnTool(llm.ToolCall, string) { r.tools++ } +func (r *captureRecorder) LogEvent(string, map[string]any) {} +func (r *captureRecorder) LogError(string) {} +func (r *captureRecorder) Close(_ context.Context, s RunStats) { r.closed = true; r.stats = s } + +// TestExecutorNilModelNoPanic: a resolver returning (ctx, nil, nil) yields a +// clean error, not a nil-pointer panic (gadfly F1, high severity). +func TestExecutorNilModelNoPanic(t *testing.T) { + ex := New(Config{ + Registry: tool.NewRegistry(), + Models: func(ctx context.Context, _ string) (context.Context, llm.Model, error) { + return ctx, nil, nil // nil model, nil error + }, + }) + res := ex.Run(context.Background(), + RunnableAgent{ModelTier: "x"}, tool.Invocation{RunID: "r"}, "hi") + if res.Err == nil { + t.Fatal("expected an error for a nil model, got nil (would have panicked in the loop)") + } +} + +// TestStatusFor maps run errors to RunStats.Status (gadfly F3). +func TestStatusFor(t *testing.T) { + cases := []struct { + err error + want string + }{ + {nil, "ok"}, + {context.DeadlineExceeded, "timeout"}, + {context.Canceled, "cancelled"}, + {fmt.Errorf("wrapped: %w", context.DeadlineExceeded), "timeout"}, + {errors.New("boom"), "error"}, + } + for _, c := range cases { + if got := statusFor(c.err); got != c.want { + t.Errorf("statusFor(%v) = %q, want %q", c.err, got, c.want) + } + } +} diff --git a/run/ports.go b/run/ports.go new file mode 100644 index 0000000..e18ac74 --- /dev/null +++ b/run/ports.go @@ -0,0 +1,168 @@ +package run + +import ( + "context" + "time" + + "gitea.stevedudenhoeffer.com/steve/majordomo/llm" + + "gitea.stevedudenhoeffer.com/steve/executus/deliver" +) + +// Ports are the host seams the run executor consumes. Every field is nil-safe: +// a light host passes the zero Ports and gets a bounded, in-memory run with no +// persistence, audit, budget, critic, delegation, or delivery — which is +// exactly a gadfly swarm task. A heavy host (mort) wires each one to a battery. +// +// This struct IS the inversion: in mort, agentexec imports agents / +// agentcritic / skillaudit and skillexec imports skills / paste directly; here +// the kernel depends only on these interfaces, and the batteries implement +// them. The mort_*_adapters.go wall becomes the set of impls. +type Ports struct { + // Audit records the run trace (start, per-step/per-tool events, final + // stats). nil = no audit. + Audit Audit + // Budget gates and meters per-caller resource use. nil = unbounded. + Budget Budget + // Critic optionally monitors a long run for hangs/runaways. nil = none. + Critic Critic + // Checkpointer persists resumable progress for durable recovery. nil = no + // checkpointing (a run interrupted by shutdown is simply lost). + Checkpointer Checkpointer + // Palette resolves SkillPalette / SubAgentPalette entries into delegation + // tools (skill__ / agent__). nil = those entries are inert. + Palette PaletteSource + // Delivery is where the run's output + artifacts go. nil = the caller + // reads the Result in-process (the light-host default). + Delivery deliver.Delivery +} + +// RunInfo describes a run at start time — the attribution a recorder/critic +// needs. Host-neutral rename of mort's SkillRun start fields. +type RunInfo struct { + RunID string + SubjectID string // the agent/skill id being run (audit "skill_id") + Name string + CallerID string + ChannelID string + ParentRunID string + Inputs map[string]any + StartedAt time.Time +} + +// RunStats is the terminal roll-up a recorder's Close writes. Mirrors mort's +// skillaudit/skillexec RunStats. +type RunStats struct { + Status string // ok | error | timeout | budget_exceeded | cancelled | dry_run + Output string + Error string + ToolCalls int + RuntimeSeconds float64 + InputTokens int64 + OutputTokens int64 + ThinkingTokens int64 +} + +// --- Audit --- + +// Audit begins recording a run. StartRun returns a per-run RunRecorder (or nil +// to skip recording this run). The audit battery wires its Storage behind this. +type Audit interface { + StartRun(ctx context.Context, info RunInfo) RunRecorder +} + +// RunRecorder records the events of one in-flight run and its final stats. It +// satisfies RunTally so the kernel can surface live token/tool counts to the +// self-status tool. Mirrors mort's skillaudit.Writer. +type RunRecorder interface { + RunTally + // OnStep records one completed agent-loop iteration's model response. + OnStep(iter int, resp *llm.Response) + // OnTool records one executed tool call + its result. + OnTool(call llm.ToolCall, result string) + // LogEvent / LogError append structured events to the run log. + LogEvent(eventType string, payload map[string]any) + LogError(msg string) + // Close writes the terminal roll-up. Detaches from the caller's context + // internally so a cancelled run still records. + Close(ctx context.Context, stats RunStats) +} + +// --- Budget --- + +// Budget gates and meters per-caller resource use. Mirrors mort's +// skillexec.BudgetTracker. +type Budget interface { + // Check reports whether the caller has remaining budget (nil = allowed). + Check(ctx context.Context, callerID string) error + // Commit records that the caller spent runtimeSeconds on this run. + Commit(ctx context.Context, callerID string, runtimeSeconds float64) +} + +// --- Critic --- + +// Critic optionally monitors a long-running run (the two-tier soft/hard +// timeout). Monitor returns a handle the executor feeds progress into and +// queries for steer/deadline decisions; a nil handle means "not monitored". +// +// The exact wiring (how the handle's Steer/Deadline bind into majordomo's +// agent.WithSteer / agent.WithMaxStepsFunc / run-context cancellation) is +// finalized in the executor; this is the seam the agentcritic battery adapts. +type Critic interface { + Monitor(ctx context.Context, info RunInfo, softTimeout time.Duration) CriticHandle +} + +// CriticHandle is the executor's live link to a run's critic. +type CriticHandle interface { + // RecordStep / RecordToolStart keep the critic's activity clock fresh so a + // healthy-but-slow run is not mistaken for a hang. + RecordStep(iter int) + RecordToolStart(name, args string) + // Steer returns any messages the critic wants injected into the loop (a + // nudge), drained before each step — matches majordomo agent.WithSteer. + Steer() []llm.Message + // Deadline returns the current hard-kill deadline (the critic may extend + // it); the executor binds the run context to it. Zero = no hard deadline. + Deadline() time.Time + // Stop ends monitoring when the run finishes. + Stop() +} + +// --- Checkpointer --- + +// Checkpointer persists a run's resumable progress for durable recovery. +// Mirrors mort's agentexec.RunCheckpointer. +type Checkpointer interface { + // Save persists the run's current resumable progress (throttled). + Save(ctx context.Context, st RunCheckpointState) error + // Complete clears the checkpoint on success. + Complete(ctx context.Context) error + // Fail clears the checkpoint on terminal failure. A run interrupted by + // shutdown is left untouched so boot recovery picks it up. + Fail(ctx context.Context, err error) error +} + +// RunCheckpointState is the resumable snapshot a Checkpointer persists. Kept +// minimal here; the executor extends what it records during the merge. +type RunCheckpointState struct { + Messages []llm.Message + Iteration int +} + +// --- PaletteSource --- + +// PaletteSource resolves a RunnableAgent's SkillPalette / SubAgentPalette names +// into delegation tools and invokes them. Mirrors mort's +// SkillInvokerForPalette + AgentInvokerForPalette. nil Palette => palette +// entries are inert ("not configured" at first call). +type PaletteSource interface { + ResolveSkill(ctx context.Context, callerID, name string) (skillID string, err error) + InvokeSkill(ctx context.Context, callerID, channelID, name string, + inputs map[string]any, parentRunID string) (output, runID, status string, err error) + + ResolveAgent(ctx context.Context, callerID, name string) (agentID string, err error) + InvokeAgent(ctx context.Context, callerID, channelID, name string, + prompt, parentRunID, modelTierOverride, promptPrepend string, + toolsSubset []string, + onEvent func(ctx context.Context, event, emoji string)) (output, runID, status string, err error) +} diff --git a/run/runengine.go b/run/runengine.go new file mode 100644 index 0000000..2b311e4 --- /dev/null +++ b/run/runengine.go @@ -0,0 +1,157 @@ +// Package run is executus's run kernel: the shared run-loop mechanics around +// majordomo's agent loop, plus the host seams (run.Ports / RunnableAgent) that +// let one executor serve every surface — a light host's bounded one-shot run, +// a heavy host's persona agent or saved skill — without the kernel importing a +// battery. +// +// This file holds the genuinely-identical scaffolding both run shapes need: +// context cancellation merging, the detached-cleanup timeout, the per-run +// progress accessor the self-status tool reads, the legacy `submit` +// compatibility tool (submit.go), the ancestor progress bridge (progress.go), +// and the run-finalizer machinery — one source of truth. +// +// The kernel depends only on majordomo + executus/tool + the run.Ports +// interfaces; persistence, audit, the persona/skill nouns, and the critic are +// host-supplied via Ports (see ports.go) so importing the kernel never drags in +// a store or a battery. +package run + +import ( + "context" + "errors" + "log/slog" + "sync/atomic" + "time" + + "gitea.stevedudenhoeffer.com/steve/executus/tool" +) + +// ErrShutdown is the cancellation cause set on mort's base lifecycle context +// when the process is shutting down (SIGTERM after the drain window). The +// agent executor uses it to distinguish a run interrupted by shutdown (which +// should be left durable-recoverable) from a run that errored or hit its own +// deadline (terminal). +var ErrShutdown = errors.New("mort: shutting down") + +// CleanupContextTimeout caps how long a run's post-completion cleanup ops +// (budget commit, audit Close, attachment bookkeeping) may wait on +// storage after detaching from the caller's — possibly already +// cancelled — context. 10s is generous for a single-row UPDATE against +// MySQL; longer suggests a hung connection the run goroutine shouldn't +// keep waiting on. Both executors derive their cleanup contexts as +// context.WithTimeout(context.WithoutCancel(ctx), CleanupContextTimeout). +const CleanupContextTimeout = 10 * time.Second + +// Reserved state-react lifecycle event keys, shared so both nouns surface +// the same UX shape. Namespaced with double-underscores to make accidental +// collision with a tool name near-impossible. +const ( + StateReactStart = "__start__" + StateReactEnd = "__end__" + StateReactError = "__error__" + StateReactBudgetExceeded = "__budget_exceeded__" +) + +// MergeCancellation returns a context cancelled when EITHER input is +// cancelled, propagating the cancellation Cause from whichever fired. Used +// by the lane preemption path (the lane's per-job ctx.Cause flows into the +// run context) and by the runtime-detach path (process shutdown still +// reaches a run whose deadline was reset after a lane wait). Always call +// the returned cancel to release the watcher goroutine; it is also invoked +// once when either input fires. +func MergeCancellation(parent, secondary context.Context) (context.Context, context.CancelFunc) { + merged, cancel := context.WithCancelCause(parent) + go func() { + select { + case <-merged.Done(): + return + case <-secondary.Done(): + cancel(context.Cause(secondary)) + } + }() + return merged, func() { cancel(nil) } +} + +// RunFinalizer is invoked at run finish so per-run tool state (open HTTP +// streams, per-run code_exec counters, per-run search budgets) is released +// and the process-lifetime maps keyed by run id don't grow unbounded. +// Both executors fire their registered finalizers via FireFinalizers. +type RunFinalizer interface { + FinalizeRun(runID string) +} + +// FireFinalizers runs every finalizer for runID, isolating each behind a +// panic-recover so one buggy finalizer can't take down the run goroutine +// or skip the others. Safe to call with a nil/empty slice. +func FireFinalizers(fs []RunFinalizer, runID string) { + for _, f := range fs { + if f == nil { + continue + } + func() { + defer func() { + if r := recover(); r != nil { + slog.Error("runengine: run finalizer panicked", + "run_id", runID, "panic", r) + } + }() + f.FinalizeRun(runID) + }() + } +} + +// RunTally is the narrow live-progress source the RunStateAccessor reads — +// the running token and tool-call counts for the in-flight run. The audit +// battery's writer satisfies it; this interface is how the run kernel reads +// live tallies without importing the audit package (the inversion of mort's +// direct *skillaudit.Writer dependency). +type RunTally interface { + // TokenStats returns the running input, output, and thinking token totals. + TokenStats() (in, out, thinking int64) + // ToolCallsCount returns the number of tool calls executed so far. + ToolCallsCount() int +} + +// RunStateAccessor is the per-run live-progress accessor the executor +// stamps on Invocation.RunState before building the toolbox, so the +// self-status tool can report iteration / tool-calls / tokens / elapsed for +// the in-flight run. Construct with NewRunStateAccessor; the executor's step +// observer calls SetIteration each loop. +type RunStateAccessor struct { + tally RunTally + iter atomic.Int32 + maxIter int + maxCalls int + startedAt time.Time +} + +// NewRunStateAccessor builds the accessor. writer supplies the live token +// + tool-call tallies; maxIter / maxCalls are the reported caps (0 = +// uncapped); startedAt anchors the elapsed clock. +func NewRunStateAccessor(tally RunTally, maxIter, maxCalls int, startedAt time.Time) *RunStateAccessor { + return &RunStateAccessor{ + tally: tally, + maxIter: maxIter, + maxCalls: maxCalls, + startedAt: startedAt, + } +} + +// SetIteration records the current agent-loop iteration (called from the +// executor's step observer). +func (a *RunStateAccessor) SetIteration(iter int) { a.iter.Store(int32(iter)) } + +// RunState satisfies tool.RunStateAccessor. +func (a *RunStateAccessor) RunState() tool.RunState { + in, out, think := a.tally.TokenStats() + return tool.RunState{ + Iteration: int(a.iter.Load()), + MaxIterations: a.maxIter, + ToolCalls: a.tally.ToolCallsCount(), + MaxToolCalls: a.maxCalls, + InputTokens: in, + OutputTokens: out, + ThinkingTokens: think, + ElapsedSeconds: int(time.Since(a.startedAt).Seconds()), + } +} diff --git a/run/steps.go b/run/steps.go new file mode 100644 index 0000000..16d6d08 --- /dev/null +++ b/run/steps.go @@ -0,0 +1,419 @@ +package run + +// steps.go — the per-run step emitter and the tool→step presentation +// mapping. This is the single place that turns the executor's post-step +// observer into ordered tool.Step records: one per tool call, each with a +// stable id, an open-vocabulary kind, and a human present-tense summary +// that flips running→complete/error. +// +// One source feeds two consumers (mirroring the OnEvent/OnToolEvent/ +// PostRunResult pattern): the live tool.Invocation.OnStep callback +// (nil-safe) AND snapshot(), which the executor copies onto Result.Steps. +// Because the Result accumulation does not depend on OnStep being set, +// every surface — chat (JSON + SSE), Discord, cron, sub-agents — carries +// steps; OnStep is needed only for live streaming. +// +// LIMITATION (current): majordomo exposes only a POST-step observer, so +// the executor calls toolStart+toolEnd back-to-back after each tool has +// already run. Steps are therefore recorded faithfully, but step.StartedAt +// ≈ EndedAt and the intermediate "running" phase is never observable to a +// live OnStep consumer. A pre-dispatch hook (wrapping each tool's handler +// to emit toolStart before execution, like mort's state-react decorator) +// is a follow-up that would restore real start timing + the running phase. +// The emitter already supports that two-call shape — toolStart and toolEnd +// are separate methods — so wiring it later is additive. + +import ( + "context" + "encoding/json" + "fmt" + "net/url" + "strings" + "time" + + "gitea.stevedudenhoeffer.com/steve/majordomo/llm" + + "gitea.stevedudenhoeffer.com/steve/executus/tool" +) + +// stepSummaryMaxLen caps the human summary length (section G size cap). +// Detail is unused in v1 (no live detail source while replies are +// generated blocking) so there is no Detail cap yet. +const stepSummaryMaxLen = 200 + +// stepEmitter accumulates ordered steps for one run and fires the live +// OnStep callback. +// +// Concurrency: touched ONLY from the agent-loop goroutine — the executor's +// stepObserver (and, once a pre-dispatch hook is wired, that hook) run +// there; majordomo executes a step's tool calls sequentially, and +// sub-agents build their own Invocation so they never reach this +// emitter. Same single-goroutine contract as the audit Writer and the +// critic ProgressRecorder — no internal lock. +type stepEmitter struct { + onStep func(ctx context.Context, ev tool.StepEvent) + now func() time.Time + + seq int + steps []tool.Step // ordered; the snapshot for Result.Steps + byID map[string]int // step id -> index into steps + pending map[string][]string // correlation key -> queued running ids (FIFO) +} + +// newStepEmitter returns an emitter that forwards to onStep (nil-safe). +func newStepEmitter(onStep func(ctx context.Context, ev tool.StepEvent)) *stepEmitter { + return &stepEmitter{ + onStep: onStep, + now: time.Now, + byID: map[string]int{}, + pending: map[string][]string{}, + } +} + +// corrKey correlates a "start" (name + raw args, no call id available at +// the pre-dispatch hook) with its later "end" (the stepObserver has the +// full call incl. id + the same raw args). +func corrKey(name string, args json.RawMessage) string { + return name + "\x00" + string(args) +} + +// toolStart records + emits the "start" of a tool call. Called from the +// pre-dispatch hookToolbox closure, before the tool runs. +func (e *stepEmitter) toolStart(ctx context.Context, name string, args json.RawMessage) { + if e == nil { + return + } + step := e.newStep(name, args) + key := corrKey(name, args) + e.pending[key] = append(e.pending[key], step.ID) + e.fire(ctx, "start", step) +} + +// toolEnd records + emits the terminal "end" of a tool call. Called from +// the stepObserver for each completed tool call. If no matching start was +// seen (e.g. a tool with a nil handler the pre-dispatch hook skipped), a +// start is synthesized so the step still appears. +func (e *stepEmitter) toolEnd(ctx context.Context, call llm.ToolCall, result string, isError bool) { + if e == nil { + return + } + id := e.matchPending(call.Name, call.Arguments) + if id == "" { + id = e.newStep(call.Name, call.Arguments).ID + } + idx, ok := e.byID[id] + if !ok { + return + } + step := &e.steps[idx] + end := e.now() + step.EndedAt = &end + if isError { + step.Status = "error" + } else { + step.Status = "complete" + } + if s := summaryForEnd(call.Name, call.Arguments, result, isError); s != "" { + step.Summary = s + } + e.fire(ctx, "end", *step) +} + +// newStep mints + appends a running step and returns it (by value). +func (e *stepEmitter) newStep(name string, args json.RawMessage) tool.Step { + e.seq++ + step := tool.Step{ + ID: fmt.Sprintf("s%d", e.seq), + Kind: kindForTool(name), + Title: name, + Summary: summaryForStart(name, args), + Status: "running", + StartedAt: e.now(), + } + e.byID[step.ID] = len(e.steps) + e.steps = append(e.steps, step) + return step +} + +// matchPending pops the oldest running step id for (name, args). Falls back to +// the OLDEST still-running step of the same tool name when the args don't +// byte-match between start and end (e.g. JSON key reordering). FIFO on the +// fallback too, consistent with the per-key queue pop above — closing the +// oldest avoids mis-correlating concurrent same-named calls. Returns "" on no +// match. +func (e *stepEmitter) matchPending(name string, args json.RawMessage) string { + key := corrKey(name, args) + if q := e.pending[key]; len(q) > 0 { + id := q[0] + if len(q) == 1 { + delete(e.pending, key) + } else { + e.pending[key] = q[1:] + } + return id + } + for i := 0; i < len(e.steps); i++ { + if e.steps[i].Title == name && e.steps[i].Status == "running" { + return e.steps[i].ID + } + } + return "" +} + +func (e *stepEmitter) fire(ctx context.Context, phase string, step tool.Step) { + if e.onStep == nil { + return + } + e.onStep(ctx, tool.StepEvent{Phase: phase, Step: step}) +} + +// snapshot returns a copy of the ordered, deduplicated step set for the +// run Result. A step still "running" at run end (e.g. the run was +// cancelled mid-tool-call) is reported as-is. +func (e *stepEmitter) snapshot() []tool.Step { + if e == nil || len(e.steps) == 0 { + return nil + } + out := make([]tool.Step, len(e.steps)) + copy(out, e.steps) + return out +} + +// kindForTool maps a tool name to an open-vocabulary step kind. Unknown +// tools fall back to "tool" — never an error, just a generic step (the +// client maps unknown kinds to a default icon). Loosely tracks the +// catalog in pkg/skilltools/CLAUDE.md. +func kindForTool(name string) string { + switch name { + case "web_search", "search_reddit", "wikipedia_summary": + return "search" + case "read_page", "read_pdf", "read_reddit", "read_video", "verify_url", + "summary_summarise", "summarize", "file_get_text", "file_get_metadata", + "http_get", "http_post", "http_get_stream", "http_stream_read": + return "read" + case "code_exec", "calculate": + return "code" + case "file_save", "file_get", "file_list", "file_delete", "file_search": + return "file" + case "kv_get", "kv_set", "kv_list", "kv_delete", + "remember", "recall", "chatbot_get_memories": + return "memory" + case "query", "query_research", "deepresearch", "animate", + "agent_invoke", "agent_invoke_parallel", "agent_spawn", + "agent_spawn_parallel", "skill_invoke", "skill_invoke_parallel": + return "delegate" + case "think": + return "thinking" + default: + switch { + case strings.HasPrefix(name, "image") || strings.Contains(name, "draw"): + return "image" + default: + return "tool" + } + } +} + +// summaryForStart builds the human present-tense running summary. It +// derives specifics from safe arg fields only; secret-bearing tools +// (mcp_call, email_send, http_*) are summarized without echoing args. +func summaryForStart(name string, args json.RawMessage) string { + var s string + switch name { + case "web_search": + if q := argString(args, "query", "q"); q != "" { + s = fmt.Sprintf("Searching the web for %q", q) + } else { + s = "Searching the web" + } + case "search_reddit": + if q := argString(args, "query", "q"); q != "" { + s = fmt.Sprintf("Searching Reddit for %q", q) + } else { + s = "Searching Reddit" + } + case "wikipedia_summary": + if q := argString(args, "query", "title"); q != "" { + s = fmt.Sprintf("Looking up %q on Wikipedia", q) + } else { + s = "Looking up Wikipedia" + } + case "read_page", "read_pdf", "read_reddit", "read_video", "verify_url": + if u := argString(args, "url", "post", "page"); u != "" { + s = "Reading " + hostOf(u) + } else { + s = "Reading a page" + } + case "http_get", "http_post", "http_get_stream": + // Show host only — a full URL can embed credentials/tokens. + if u := argString(args, "url"); u != "" { + s = "Fetching " + hostOf(u) + } else { + s = "Making an HTTP request" + } + case "summary_summarise", "summarize": + s = "Summarizing text" + case "translate": + if lang := argString(args, "target_lang", "target_language", "lang"); lang != "" { + s = "Translating to " + lang + } else { + s = "Translating text" + } + case "code_exec": + s = "Running code" + case "calculate": + if q := argString(args, "query", "expression", "expr"); q != "" { + s = "Calculating " + truncateStep(q, 60) + } else { + s = "Calculating" + } + case "remember": + // Never echo the stored value. + s = "Saving a memory" + case "recall", "chatbot_get_memories": + s = "Recalling memories" + case "kv_get", "kv_list": + s = "Reading saved data" + case "kv_set": + s = "Saving data" + case "kv_delete": + s = "Deleting saved data" + case "file_save": + if n := argString(args, "name", "filename"); n != "" { + s = "Saving file " + truncateStep(n, 60) + } else { + s = "Saving a file" + } + case "file_get", "file_get_text", "file_get_metadata": + s = "Reading a file" + case "file_list", "file_search": + s = "Listing files" + case "query", "query_research": + if q := argString(args, "query", "question", "prompt", "task"); q != "" { + s = "Researching " + truncateStep(q, 80) + } else { + s = "Researching" + } + case "deepresearch": + s = "Running deep research" + case "animate": + s = "Generating an animation" + case "agent_invoke", "agent_spawn": + if a := argString(args, "agent", "agent_name", "name"); a != "" { + s = "Delegating to " + a + } else { + s = "Delegating to a sub-agent" + } + case "agent_invoke_parallel", "agent_spawn_parallel": + s = "Delegating to sub-agents" + case "skill_invoke": + if sk := argString(args, "skill_name", "skill", "name"); sk != "" { + s = "Running skill " + sk + } else { + s = "Running a skill" + } + case "skill_invoke_parallel": + s = "Running skills in parallel" + case "think": + s = "Thinking" + case "mcp_call": + // Redact: MCP args frequently carry secrets. Name server/tool only. + srv, tl := argString(args, "server"), argString(args, "tool") + switch { + case srv != "" && tl != "": + s = fmt.Sprintf("Calling %s/%s", srv, tl) + case srv != "": + s = "Calling " + srv + default: + s = "Calling an MCP tool" + } + case "email_send": + // Redact recipients + body. + s = "Sending an email" + default: + s = "Using " + name + } + return truncateStep(s, stepSummaryMaxLen) +} + +// summaryForEnd optionally upgrades the summary to a cheap result phrase. +// Returns "" to keep the running summary (the caller then just flips the +// status). Never returns a phrase derived from raw result bytes. +func summaryForEnd(name string, _ json.RawMessage, result string, isError bool) string { + if isError { + return "" + } + switch name { + case "web_search", "search_reddit": + if n := countResults(result); n >= 0 { + return fmt.Sprintf("Found %d result%s", n, plural(n)) + } + } + return "" +} + +// argString pulls the first present non-empty string field from a tool's +// raw JSON args, trying keys in order. Returns "" when none parse. +func argString(args json.RawMessage, keys ...string) string { + if len(args) == 0 { + return "" + } + var m map[string]any + if err := json.Unmarshal(args, &m); err != nil { + return "" + } + for _, k := range keys { + if v, ok := m[k]; ok { + if s, ok := v.(string); ok && strings.TrimSpace(s) != "" { + return strings.TrimSpace(s) + } + } + } + return "" +} + +// countResults parses a v11-style {"results":[...]} envelope and returns +// the count, or -1 when the shape doesn't match. +func countResults(result string) int { + if strings.TrimSpace(result) == "" { + return -1 + } + var env struct { + Results []json.RawMessage `json:"results"` + } + if err := json.Unmarshal([]byte(result), &env); err != nil || env.Results == nil { + return -1 + } + return len(env.Results) +} + +// hostOf returns the bare host (no leading www.) of a URL, or a short +// form of the raw string when it doesn't parse as a URL. +func hostOf(raw string) string { + if u, err := url.Parse(raw); err == nil && u.Host != "" { + return strings.TrimPrefix(u.Host, "www.") + } + return truncateStep(raw, 60) +} + +// truncateStep rune-safely caps s to max, appending an ellipsis when cut. +func truncateStep(s string, max int) string { + if max <= 0 { + return "" + } + r := []rune(s) + if len(r) <= max { + return s + } + if max == 1 { + return string(r[:1]) + } + return string(r[:max-1]) + "…" +} + +func plural(n int) string { + if n == 1 { + return "" + } + return "s" +} diff --git a/run/submit.go b/run/submit.go new file mode 100644 index 0000000..59ae221 --- /dev/null +++ b/run/submit.go @@ -0,0 +1,84 @@ +package run + +import ( + "context" + "strings" + "sync" + + llm "gitea.stevedudenhoeffer.com/steve/majordomo/llm" +) + +// SubmitCapture records the output a run's `submit` tool received. +// +// Why this exists: legacy agentkit injected a synthetic `submit` tool and +// ended the loop when it fired; years of mort system prompts (agent +// YAMLs, skill manifests, the executors' platform headers) teach the +// model to "call submit with your final answer". majordomo's agent loop +// has no submit concept — it ends when the model replies WITHOUT tool +// calls. Dropping submit cold would make every prompt-trained model +// burn turns on "unknown tool \"submit\"" errors. +// +// The compatibility shape: the executors add NewSubmitTool's tool to +// every run's toolset (unless the palette already defines a `submit`). +// The handler records the FIRST submitted answer and tells the model +// the answer was accepted so its next turn is a bare reply (which ends +// the loop naturally). After the run, the executor consults +// Output(loopOutput, runErr): a captured submission wins over an empty +// or budget-exhausted ending, so a model that submits on its final +// allowed step still produces its answer instead of ErrMaxSteps. +type SubmitCapture struct { + mu sync.Mutex + output string + called bool +} + +// Record stores the first submitted answer; later calls are ignored +// (matching legacy agentkit's "multiple calls keep the first" contract). +func (c *SubmitCapture) Record(output string) { + c.mu.Lock() + defer c.mu.Unlock() + if c.called { + return + } + c.called = true + c.output = output +} + +// Submitted returns the captured answer and whether submit fired. +func (c *SubmitCapture) Submitted() (string, bool) { + c.mu.Lock() + defer c.mu.Unlock() + return c.output, c.called +} + +// Output resolves the run's final output: the submitted answer when the +// model called submit (parity with legacy agentkit, where submit's argument +// WAS the run output), otherwise the loop's own final text. resolvedErr +// is nil when a submission exists — a run that submitted its answer and +// then ran out of steps (or timed out composing the courtesy +// confirmation turn) is a SUCCESS, not an error. +func (c *SubmitCapture) Output(loopOutput string, runErr error) (output string, resolvedErr error) { + if out, ok := c.Submitted(); ok { + return out, nil + } + return loopOutput, runErr +} + +// submitArgs mirrors legacy agentkit's synthetic submit tool schema so +// models prompted under the old contract emit compatible calls. +type submitArgs struct { + Output string `json:"output" description:"The final answer, summary, or output for this task."` +} + +// NewSubmitTool builds the compatibility `submit` tool bound to the +// given capture. Both executors (skill + agent) install one per run. +func NewSubmitTool(capture *SubmitCapture) llm.Tool { + return llm.DefineTool[submitArgs]( + "submit", + "Submit your final answer or output to end this task. Call exactly once when you are done.", + func(_ context.Context, args submitArgs) (any, error) { + capture.Record(strings.TrimSpace(args.Output)) + return "Final answer recorded. Do not call any more tools; reply now with a brief closing message.", nil + }, + ) +}