P2: run kernel + run.Ports inversion — executus is runnable #2

Merged
steve merged 9 commits from phase-2-run-kernel into main 2026-06-27 02:02:21 +00:00
16 changed files with 1863 additions and 42 deletions
+15 -17
View File
@@ -1,10 +1,11 @@
# Gadfly — agentic adversarial PR reviewer (https://gitea.stevedudenhoeffer.com/steve/gadfly).
#
# Runs the published Gadfly image (pinned to :v1) as a specialist swarm and posts
# Runs the published Gadfly image (pinned to an immutable :sha- tag — act_runner
# caches :latest, and this build is what carries foreman provider-type support)
# as a specialist swarm and posts
# ONE consolidated review comment as gitea-actions. Advisory only — never blocks a
# merge. This reviews executus PRs (same setup as mort: m1/m5 locals + 2 cloud,
# 3-lens suite). Gadfly is a simple system — treat its findings as advisory and
# double-check before acting.
# merge. This reviews executus PRs with 3 ollama-cloud models (3-lens suite). Gadfly
# is a simple system — findings are advisory; always double-check before acting.
name: Adversarial Review (Gadfly)
@@ -40,24 +41,21 @@ jobs:
|| github.actor == 'fizi'
|| github.actor == 'dazed'))
runs-on: ubuntu-latest
# Full fleet (2 cloud + 2 local Macs, all running concurrently) reviewing
# every PR with the 3-lens suite — the slow local lanes dominate wall time.
timeout-minutes: 90
# 3 cloud models, all concurrent, 3-lens suite. ~12 min typical.
timeout-minutes: 30
steps:
- uses: docker://gitea.stevedudenhoeffer.com/steve/gadfly:v1
- uses: docker://gitea.stevedudenhoeffer.com/steve/gadfly:sha-6e3a83c
env:
GITEA_API: ${{ github.server_url }}/api/v1/repos/${{ github.repository }}
GITEA_TOKEN: ${{ secrets.GITEA_TOKEN }}
OLLAMA_CLOUD_API_KEY: ${{ secrets.OLLAMA_CLOUD_API_KEY }}
# Local Ollama boxes (each its own lane, cap 1). NOTE: both Macs must be
# awake/reachable for their reviews to run; if a box is offline, that
# model's comment shows an error and the others still post.
GADFLY_ENDPOINT_M1PRO: "ollama|http://192.168.0.175:11434"
GADFLY_ENDPOINT_M5MAX: "ollama|http://192.168.0.173:11434"
# 2 cloud (parallel) + M1 Pro + M5 Max — one consolidated comment each.
GADFLY_MODELS: "minimax-m3:cloud,deepseek-v4-flash:cloud,m1pro/qwen3:14b,m5max/qwen3.6:35b-mlx"
# cloud runs 2 at once; each Mac one at a time; all three lanes parallel.
GADFLY_PROVIDER_CONCURRENCY: "ollama-cloud=2,m1pro=1,m5max=1"
# executus uses CLOUD MODELS ONLY. The local Macs (m1/m5) were dropped:
# on a P2-review measurement they took 2629 min (with lens timeouts)
# and contributed ZERO real findings — the two cloud models found every
# genuine bug in 612 min. Cloud-only is faster AND higher-signal.
# 3 cloud models, one consolidated comment each, all run in parallel.
GADFLY_MODELS: "minimax-m3:cloud,deepseek-v4-flash:cloud,glm-5.2:cloud"
GADFLY_PROVIDER_CONCURRENCY: "ollama-cloud=3"
# Default => the 3-lens suite (security, correctness, error-handling).
# Set the repo var GADFLY_SPECIALISTS to override (csv / "all" / "auto").
GADFLY_SPECIALISTS: ${{ vars.GADFLY_SPECIALISTS || 'security,correctness,error-handling' }}
+8 -3
View File
@@ -43,8 +43,13 @@ CORE (majordomo + stdlib):
fanout/ programmatic N×M swarm [P0 ✓]
deliver/ output egress seam (+ Discard/Stdout) [P0 ✓]
identity/ caller identity seams [P0 ✓]
run/ progress bridge now; the executor kernel + [P0 partial]
nil-safe Ports + RunnableAgent later [P2]
run/ run.Executor is RUNNABLE: model-resolve + [P2 core ✓]
toolbox + majordomo loop + compaction +
run-bounding (V10 detached timeout) + step/
audit observers + Budget gate; RunnableAgent
DTO + nil-safe run.Ports. Follow-ups: wire
Critic/Checkpointer/PaletteSource/Delivery,
Phases, and the no-tools direct path [P2]
dispatchguard/ loop/depth/fan-out caps [P0 ✓]
pendingattach/ attachment dedupe [P0 ✓]
tool/ registry + 3-stage permissions + ssrf [P1 ✓]
@@ -52,7 +57,7 @@ CORE (majordomo + stdlib):
(convar->config.Source; UsageSink/TraceSink seams; GenerateWith[T]
structured output — no separate structured/ pkg)
llmmeta/ shared meta-LLM helper over model/ [P1 ✓]
compact/ context compactor (WithCompactor hook) [P2]
compact/ context compactor (WithCompactor hook) [P2]
tools/{web,net,store,compose,meta,comms} generic tools [P3]
BATTERIES (opt-in siblings, each nil-safe + a default):
+13 -5
View File
@@ -31,15 +31,23 @@ bot) — mort and gadfly are the first two consumers (heavy and light). See
[mort]: https://gitea.stevedudenhoeffer.com/steve/mort
**Available today (P0):**
**Available today:**
- `run/`**executus is runnable.** `run.Executor` ties model resolution, the
tool registry, majordomo's agent loop, context compaction, run-bounding, and
step/audit instrumentation into one `Run(ctx, RunnableAgent, inv) Result`, with
every host concern behind a nil-safe `run.Ports` (Audit/Budget/Critic/
Checkpointer/PaletteSource/Delivery). See `examples/minimal`.
- `model/` — config-driven tier resolution + failover over majordomo, with
pluggable `UsageSink`/`TraceSink` and `GenerateWith[T]` structured output.
- `tool/` — the tool registry + 3-stage permission model + SSRF guard.
- `compact/` — the per-run context compactor.
- `lane/` — bounded worker pool with fair-share queueing (run- and
provider-concurrency).
- `fanout/` — programmatic N×M swarm with bounded global + per-key concurrency.
- `config/` — the host config seam (`Source`) with an env-var default.
- `deliver/` — the output-egress seam with `Discard`/`Stdout` defaults.
- `identity/` — caller-identity seams (`AdminPolicy`, `MemberResolver`).
- `dispatchguard/`, `pendingattach/`, `run/progress.go` — run-safety primitives.
- `config/`, `deliver/`, `identity/` — host seams (config / output / identity),
each with a shipped default.
- `dispatchguard/`, `pendingattach/` — run-safety primitives.
## Design
+369
View File
@@ -0,0 +1,369 @@
// V15.2 context compactor (re-based on majordomo).
//
// Why: the agent loop accumulates tool results indefinitely. A
// research-heavy run with many web_search / read_page / http_get
// results easily crosses 200K tokens and trips the model's HTTP-400
// "prompt too long" rejection mid-run (observed at 410K tokens on
// qwen3-coder:480b which has a 262K cap). majordomo's agent loop calls
// the compactor with the full message slice before every model call;
// the compactor returns a shorter slice that preserves the system
// prompt + recent messages, with the middle range replaced by a
// synthetic summary.
//
// Strategy (unchanged from the agentkit era):
// - Keep any leading system message verbatim. (Under majordomo the
// system prompt normally travels in Request.System, not in the
// message slice, so this is defensive.)
// - Keep the last KeepRecent messages verbatim. This ensures the
// agent has fresh tool state to act on; compacting too aggressively
// would strip the in-flight context it needs to make the next
// decision.
// - Compress the middle range via a single fast-tier LLM call that
// receives the middle messages as raw text and produces a paragraph
// summary (URLs visited, key findings, file_ids created, what the
// agent is trying to accomplish).
// - Replace the middle range with one synthetic user-role message
// containing the summary. (user-role chosen because tool-result-role
// would be ambiguous without a matching tool_call_id.)
//
// What moved in the majordomo conversion:
// - The token-threshold gate lives HERE now. agentkit estimated
// tokens and only invoked the compactor past a configured
// threshold; majordomo's hook fires before every model call, so
// the threshold check (estimateTokens vs the per-run threshold the
// executor computes from the model's context limit) is the
// compactor's first step.
// - The compactor is per-run STATEFUL: majordomo does not replace
// the loop's internal transcript with the compacted slice (the
// hook shapes only what is SENT), so without memory the middle
// would be re-summarised from scratch on every step past the
// threshold. The state remembers how far the transcript has been
// folded into the running summary and folds the previous summary
// into the next one instead of re-paying for it.
//
// Failure path: any error (LLM unavailable, malformed response, etc.)
// is returned to the agent loop, which treats compactor errors as
// non-fatal and sends the original slice — if the next model call hits
// the provider's limit, the existing HTTP-400 error path takes over.
package compact
import (
"context"
"errors"
"fmt"
"strings"
"sync"
"gitea.stevedudenhoeffer.com/steve/majordomo/llm"
)
// ModelResolver resolves a tier/spec to a usable llm.Model (and an enriched
// context for usage attribution). model.ParseModelForContext satisfies it.
type ModelResolver func(ctx context.Context, tier string) (context.Context, llm.Model, error)
// Compactor is the per-run compaction hook handed to the agent loop
// (matches the signature agent.WithCompactor expects).
type Compactor = func(ctx context.Context, msgs []llm.Message) ([]llm.Message, error)
// CompactionEvent describes one fired compaction so the executor can
// log it to skill_run_logs ("compaction_fired").
type CompactionEvent struct {
// MessagesBefore/After count the messages that would have been
// sent without/with this compaction.
MessagesBefore int
MessagesAfter int
// TokensBefore/After are the estimateTokens values for the same
// two slices.
TokensBefore int
TokensAfter int
}
// CompactorFactory mints a fresh per-run Compactor bound to a token
// threshold. onFire (nil-safe) observes every compaction that actually
// fires. A non-positive threshold yields a pass-through compactor.
type CompactorFactory func(thresholdTokens int, onFire func(CompactionEvent)) Compactor
// CompactorConfig controls the v15.2 compactor's behaviour. Construct
// in mort.go from convars + the executor's ModelResolver.
type CompactorConfig struct {
// Models resolves a model spec (typically "fast" or a specific tier)
// to an llm.Model. Required; nil disables compaction.
Models ModelResolver
// SummarizerTier is the model tier used for the compression LLM call.
// Production default "fast"; an admin may set "haiku" or a specific
// model spec. Empty falls back to "fast".
SummarizerTier string
// KeepRecent is the number of trailing messages preserved verbatim.
// Default 8. Lower values compact more aggressively (the next loop
// iteration sees less recent context); higher values keep more.
KeepRecent int
// SummaryWordCap bounds the LLM-generated summary length. Default
// 200 words ≈ 800 chars ≈ 200 tokens — small enough that the
// compaction always shrinks the slice meaningfully.
SummaryWordCap int
}
// NewCompactor returns a CompactorFactory implementing the middle-range
// summarisation strategy. nil cfg.Models returns a factory of no-op
// compactors that always return the input unchanged (degrades to
// v15.1 behaviour).
func NewCompactor(cfg CompactorConfig) CompactorFactory {
if cfg.Models == nil {
return func(int, func(CompactionEvent)) Compactor {
return func(_ context.Context, msgs []llm.Message) ([]llm.Message, error) {
return msgs, nil
}
}
}
if cfg.SummarizerTier == "" {
cfg.SummarizerTier = "fast"
}
if cfg.KeepRecent <= 0 {
cfg.KeepRecent = 8
}
if cfg.SummaryWordCap <= 0 {
cfg.SummaryWordCap = 200
}
return func(threshold int, onFire func(CompactionEvent)) Compactor {
st := &compactionState{}
return func(ctx context.Context, msgs []llm.Message) ([]llm.Message, error) {
return compactIfNeeded(ctx, cfg, st, threshold, onFire, msgs)
}
}
}
// compactionState is the per-run fold memory: msgs[:prefixEnd]
// (excluding a leading system message) are represented by summaryText
// in the rendered slice. Guarded by mu for safety although the agent
// loop invokes the hook from a single goroutine.
type compactionState struct {
mu sync.Mutex
prefixEnd int
summaryText string
}
// compactIfNeeded is the workhorse: render the transcript with any
// existing fold applied, check the threshold, and fold more of the
// middle into the summary when the rendered size still exceeds it.
func compactIfNeeded(ctx context.Context, cfg CompactorConfig, st *compactionState,
threshold int, onFire func(CompactionEvent), msgs []llm.Message) ([]llm.Message, error) {
st.mu.Lock()
defer st.mu.Unlock()
rendered := renderCompacted(st, msgs)
if threshold <= 0 {
return rendered, nil
}
tokensBefore := estimateTokens(rendered)
if tokensBefore < threshold {
return rendered, nil
}
// Determine the new middle range to fold: everything between what
// is already summarised (or the optional leading system message)
// and the KeepRecent tail.
startMiddle := st.prefixEnd
if startMiddle == 0 && len(msgs) > 0 && msgs[0].Role == llm.RoleSystem {
startMiddle = 1
}
endMiddle := len(msgs) - cfg.KeepRecent
if endMiddle <= startMiddle {
// Nothing new to fold (the tail alone exceeds the threshold).
// Return the rendered slice; the model call may still succeed.
return rendered, nil
}
middle := msgs[startMiddle:endMiddle]
summary, err := summariseMiddle(ctx, cfg, st.summaryText, middle)
if err != nil {
// Non-fatal upstream: the agent loop sends the ORIGINAL slice. Return
// msgs, not `rendered` — on a second+ compaction `rendered` already
// carries a prior synthetic summary, which is not the documented
// "original slice" the loop expects on a compactor error.
return msgs, fmt.Errorf("compactor: summarise middle: %w", err)
}
st.summaryText = summary
st.prefixEnd = endMiddle
out := renderCompacted(st, msgs)
if onFire != nil {
onFire(CompactionEvent{
MessagesBefore: len(rendered),
MessagesAfter: len(out),
TokensBefore: tokensBefore,
TokensAfter: estimateTokens(out),
})
}
return out, nil
}
// renderCompacted applies the fold state to msgs: [optional system] +
// [synthetic summary] + msgs[prefixEnd:]. With no fold yet, msgs is
// returned unchanged.
func renderCompacted(st *compactionState, msgs []llm.Message) []llm.Message {
if st.prefixEnd <= 0 || st.prefixEnd > len(msgs) {
return msgs
}
tail := msgs[st.prefixEnd:]
out := make([]llm.Message, 0, len(tail)+2)
if msgs[0].Role == llm.RoleSystem {
out = append(out, msgs[0])
}
out = append(out, llm.UserText(
"[CONTEXT COMPACTED] The earlier portion of this conversation was summarised "+
"to fit the model's context window. Summary:\n\n"+st.summaryText+
"\n\nResume from the recent messages below."))
out = append(out, tail...)
return out
}
// summariseMiddle composes a "compress this transcript" prompt and
// fires one fast-tier LLM call. prevSummary (may be empty) is the
// running summary from earlier compactions; it is folded into the new
// summary so prior context is not lost.
func summariseMiddle(ctx context.Context, cfg CompactorConfig, prevSummary string, middle []llm.Message) (string, error) {
if len(middle) == 0 {
return "", errors.New("compactor: empty middle range")
}
modelCtx, model, err := cfg.Models(ctx, cfg.SummarizerTier)
if err != nil {
return "", fmt.Errorf("compactor: resolve summarizer model %q: %w", cfg.SummarizerTier, err)
}
if model == nil {
return "", errors.New("compactor: summarizer model resolved to nil")
}
if modelCtx != nil {
ctx = modelCtx
}
var prior string
if strings.TrimSpace(prevSummary) != "" {
prior = "AN EARLIER PORTION WAS ALREADY SUMMARISED AS:\n" + prevSummary +
"\n\nFold that summary into your new one — its facts must survive.\n\n"
}
transcript := renderTranscript(middle)
prompt := fmt.Sprintf(
"You are compressing an in-flight agent's conversation transcript so the agent "+
"can continue working without blowing its model context. The transcript below is "+
"a sequence of tool calls and their results. Produce a single paragraph (under %d words) "+
"that captures:\n"+
" - WHAT the agent has been trying to accomplish.\n"+
" - WHICH URLs were visited / fetched (list inline, comma-separated).\n"+
" - KEY findings or decisions (factual results the agent will need later).\n"+
" - ANY file_ids or KV keys the agent created — these are persistent state references the agent must keep.\n"+
" - ANY errors or dead-ends that the agent should not re-try.\n"+
"DO NOT include verbose HTTP headers, tool-call metadata, error stack traces, or repetitive content. "+
"DO NOT add commentary or markdown headers. Output prose only.\n\n"+
"%sTRANSCRIPT TO COMPRESS:\n%s",
cfg.SummaryWordCap,
prior,
transcript,
)
resp, err := model.Generate(ctx, llm.Request{Messages: []llm.Message{llm.UserText(prompt)}})
if err != nil {
return "", fmt.Errorf("compactor: summarise LLM call: %w", err)
}
text := strings.TrimSpace(resp.Text())
if text == "" {
return "", errors.New("compactor: summarizer returned empty text")
}
return text, nil
}
// estimateTokens is the chars/4 heuristic over a message slice's text,
// tool calls, and tool results. Images count a flat ~1K tokens each.
// It intentionally matches the coarse estimator the old agentkit loop
// used — the 0.7 threshold ratio provides the safety margin.
func estimateTokens(msgs []llm.Message) int {
chars := 0
for _, m := range msgs {
for _, p := range m.Parts {
switch v := p.(type) {
case llm.TextPart:
chars += len(v.Text)
case llm.ImagePart:
chars += 4096
default:
// llm.Part is a sealed-but-extensible interface (future media
// kinds). Count an unknown part conservatively (like an image)
// rather than 0, so a transcript of unrecognised content can't
// silently slip under the compaction threshold and 400 the
// model. Bump this if a large new part kind lands.
_ = v
chars += 4096
}
}
for _, tc := range m.ToolCalls {
chars += len(tc.Name) + len(tc.Arguments)
}
for _, tr := range m.ToolResults {
chars += len(tr.Content)
}
}
return chars / 4
}
// transcriptMessageCap bounds individual message bodies at ~2KB so a
// single ultra-long tool result can't dominate the prompt sent to the
// summarizer.
const transcriptMessageCap = 2048
// secretBearingTools name tools whose ARGUMENTS routinely carry credentials or
// message bodies (bearer tokens, API keys, recipients, request bodies). Their
// args are dropped before the transcript reaches the summarizer model — which
// may be a different provider/tier than the run model — mirroring the redaction
// run/steps.go applies to user-facing step summaries. http_* and webhook_* are
// matched by prefix below.
var secretBearingTools = map[string]bool{
"mcp_call": true,
"email_send": true,
}
// redactToolArgs returns a summariser-safe rendering of a tool call's args:
// "[redacted]" for known secret-bearing tools, the args verbatim otherwise.
func redactToolArgs(name, args string) string {
if secretBearingTools[name] ||
strings.HasPrefix(name, "http_") ||
strings.HasPrefix(name, "webhook_") {
return "[redacted]"
}
return args
}
// renderTranscript flattens a message slice to a plain-text transcript
// suitable for the summarisation prompt. Tool calls show name + (redacted) args,
// tool results show name + body. Empty fields are skipped.
//
// NOTE: tool-RESULT bodies are forwarded to the summarizer (the summary needs
// the findings). A host whose tool results may contain secrets and whose
// summarizer tier resolves to an untrusted provider should ensure that tier is
// trusted, or pre-sanitise results before they reach the agent loop.
func renderTranscript(msgs []llm.Message) string {
var sb strings.Builder
for i, m := range msgs {
fmt.Fprintf(&sb, "---\n[%d] role=%s\n", i+1, m.Role)
if text := m.Text(); text != "" {
sb.WriteString(truncate(text, transcriptMessageCap))
sb.WriteString("\n")
}
for _, tc := range m.ToolCalls {
fmt.Fprintf(&sb, "tool_call name=%s args=%s\n", tc.Name, truncate(redactToolArgs(tc.Name, string(tc.Arguments)), transcriptMessageCap))
}
for _, tr := range m.ToolResults {
fmt.Fprintf(&sb, "tool_result name=%s body=%s\n", tr.Name, truncate(tr.Content, transcriptMessageCap))
}
}
return sb.String()
}
func truncate(s string, n int) string {
if len(s) <= n {
return s
}
return s[:n] + "...(truncated)"
}
+35 -13
View File
@@ -1,27 +1,49 @@
// Command minimal demonstrates executus's standalone core primitives available
// today (P0): the config seam + bounded fan-out. The full zero-config "agentic
// in ~12 lines" example arrives once the model, tool, and run packages land
// (P1P3).
// Command minimal is executus's "hello, agentic world": wire a model resolver,
// a tool registry, and the run executor, then run an agent. With no batteries
// (Audit/Budget/Critic/Checkpointer/Palette/Delivery all nil) this is a
// bounded, in-memory run — the light-host shape (gadfly's case).
//
// Run it with a provider key for the configured tier, e.g.
//
// ANTHROPIC_API_KEY=sk-... go run ./examples/minimal
//
// Override a tier from the environment without touching code, e.g.
//
// EXECUTUS_MODEL_TIER_FAST=openai/gpt-4o-mini ANTHROPIC_API_KEY= OPENAI_KEY=sk-... go run ./examples/minimal
package main
import (
"context"
"fmt"
"log"
"gitea.stevedudenhoeffer.com/steve/executus/config"
"gitea.stevedudenhoeffer.com/steve/executus/fanout"
"gitea.stevedudenhoeffer.com/steve/executus/model"
"gitea.stevedudenhoeffer.com/steve/executus/run"
"gitea.stevedudenhoeffer.com/steve/executus/tool"
)
func main() {
cfg := config.Env("EXECUTUS_") // e.g. EXECUTUS_FANOUT_MAX_CONCURRENT=8
max := cfg.Int("fanout.max_concurrent", 4)
// 1. Configure model tiers: live values come from the environment
// (EXECUTUS_MODEL_TIER_<NAME>), falling back to these defaults.
model.Configure(config.Env("EXECUTUS_"), map[string]string{
"fast": "anthropic/claude-haiku-4-5",
"thinking": "anthropic/claude-opus-4-8",
}, 0)
items := []string{"alpha", "beta", "gamma", "delta"}
results := fanout.Run(context.Background(), items,
fanout.Options[string]{MaxConcurrent: max},
func(_ context.Context, s string) (int, error) { return len(s), nil })
// 2. Build the executor: a tool registry + the model resolver. No batteries.
ex := run.New(run.Config{
Registry: tool.NewRegistry(),
Models: model.ParseModelForContext,
})
for _, r := range results {
fmt.Printf("%-6s -> %d (err=%v)\n", items[r.Index], r.Value, r.Err)
// 3. Run an agent and print its answer.
res := ex.Run(context.Background(),
run.RunnableAgent{Name: "assistant", SystemPrompt: "You are concise.", ModelTier: "fast"},
tool.Invocation{RunID: "demo-1", CallerID: "local"},
"In one sentence, what is an agent harness?")
if res.Err != nil {
log.Fatalf("run failed: %v", res.Err)
}
fmt.Println(res.Output)
}
+4 -1
View File
@@ -34,6 +34,7 @@ import (
"context"
"encoding/json"
"fmt"
"log/slog"
"strings"
"time"
@@ -574,7 +575,9 @@ func (h *Helper) recordLedger(ctx context.Context, call MetaCall) {
if h.storage == nil {
return
}
_ = h.storage.RecordMetaCall(ctx, call)
if err := h.storage.RecordMetaCall(ctx, call); err != nil {
slog.Warn("llmmeta: failed to record ledger row", "err", err)
}
}
// tryParseJSON attempts to decode text as JSON. Returns the parsed
+1 -1
View File
@@ -237,7 +237,7 @@ func recordUsage(ctx context.Context, resp *llm.Response) {
return
}
u := resp.Usage
if u.InputTokens == 0 && u.OutputTokens == 0 {
if u.InputTokens == 0 && u.OutputTokens == 0 && u.CacheReadTokens == 0 && u.CacheWriteTokens == 0 {
return
}
model := resolvedModelName(ctx, resp)
+23 -2
View File
@@ -314,7 +314,7 @@ func (c *CloudOllamaLimitCache) fetchTags(ctx context.Context) ([]string, error)
return nil, err
}
defer resp.Body.Close()
body, err := io.ReadAll(resp.Body)
body, err := readCapped(resp.Body)
if err != nil {
return nil, err
}
@@ -367,7 +367,7 @@ func (c *CloudOllamaLimitCache) fetchContextLength(ctx context.Context, modelNam
return 0, err
}
defer resp.Body.Close()
respBody, err := io.ReadAll(resp.Body)
respBody, err := readCapped(resp.Body)
if err != nil {
return 0, err
}
@@ -451,3 +451,24 @@ func truncate(b []byte, n int) string {
}
return string(b[:n]) + "...(truncated)"
}
// maxLimitCacheResponseBytes bounds the ollama.com limit-cache HTTP responses
// (/api/tags, /api/show) so a misbehaving endpoint can't stream an unbounded
// body before the 15s timeout fires. 1 MiB is far above any real response.
const maxLimitCacheResponseBytes = 1 << 20
// readCapped reads up to maxLimitCacheResponseBytes from r and returns a clear
// error if the response EXCEEDS the cap — rather than silently truncating (as a
// bare io.LimitReader does) and letting downstream json.Unmarshal fail with an
// opaque "unexpected end of JSON input". It reads one extra byte to detect the
// overflow.
func readCapped(r io.Reader) ([]byte, error) {
body, err := io.ReadAll(io.LimitReader(r, maxLimitCacheResponseBytes+1))
if err != nil {
return nil, err
}
if len(body) > maxLimitCacheResponseBytes {
return nil, fmt.Errorf("cloud_sync: response exceeded %d bytes", maxLimitCacheResponseBytes)
}
return body, nil
}
+5
View File
@@ -16,6 +16,11 @@ import (
// UsageSink receives one record per successful Generate through a model parsed
// by this package (ParseModelRequest / ParseModelForContext). Implement it to
// meter or bill; the token detail mirrors majordomo's Response.Usage.
//
// IMPORTANT: cacheReadTokens and cacheWriteTokens are PORTIONS of inputTokens,
// not independent additive values (they let a sink price cached vs fresh input
// differently). A sink must NOT compute total = input+output+cacheRead+
// cacheWrite — that double-counts the cached input.
type UsageSink interface {
Record(ctx context.Context, model string, inputTokens, outputTokens, cacheReadTokens, cacheWriteTokens int)
}
+76
View File
@@ -0,0 +1,76 @@
package run
import "time"
// RunnableAgent is the kernel's view of "a thing to run": an identity, a model
// tier, a system prompt, execution caps, and a tool palette. It is a plain DTO
// on purpose — the run kernel never imports a noun battery. The persona Agent
// and the saved Skill each LOWER themselves into a RunnableAgent (a ToRunnable
// method on the battery side), and the kernel runs the DTO. This is the
// inversion of mort's agentexec.Executor.Run(*agents.Agent): the executor no
// longer depends on the persona struct, only on this shape.
//
// A light host can build a RunnableAgent inline (model tier + prompt + a few
// tool names) for a one-shot bounded run, with no persona or skill battery at
// all — that is exactly gadfly's swarm task.
type RunnableAgent struct {
// ID is a stable identifier for the run subject (an agent/skill UUID, or
// any host-chosen id). Used for audit attribution and dispatch-guard
// genealogy. Empty is allowed for anonymous one-shot runs.
ID string
// Name is a human label (audit/logs/delivery). Empty is allowed.
Name string
// SystemPrompt is the agent's base system prompt (before per-run
// personalization, which a host layers via Ports).
SystemPrompt string
// ModelTier is a tier alias or concrete spec resolved through
// model.ParseModelForContext. Empty resolves to the host's default tier.
ModelTier string
// MaxIterations caps the agent loop's tool-dispatch steps. 0 = kernel
// default. MaxRuntime caps wall-clock for the whole run (the kernel starts
// this clock AFTER any lane dequeue, not at submission). 0 = kernel
// default.
MaxIterations int
MaxRuntime time.Duration
// LowLevelTools are tool-registry names the run may call directly.
// SkillPalette / SubAgentPalette name saved skills / sub-agents exposed as
// skill__<name> / agent__<name> delegation tools, resolved through
// Ports.Palette (nil Palette => those entries are inert).
LowLevelTools []string
SkillPalette []string
SubAgentPalette []string
// Phases optionally model a multi-step pipeline (each phase its own prompt
// + tier + tools). An empty slice is a single-phase run — the common case.
Phases []Phase
// Critic configures the optional two-tier run-critic (Ports.Critic). The
// zero value (disabled) is the light-host default.
Critic CriticConfig
}
// Phase is one step of a multi-step run: its own system prompt, model tier,
// iteration cap, and tool subset. Optional phases may be skipped by the
// pipeline when their precondition isn't met.
type Phase struct {
Name string
SystemPrompt string
ModelTier string
MaxIterations int
Tools []string
Optional bool
}
// CriticConfig configures the optional run-critic. Enabled gates whether a
// critic monitor is started at all; BackstopMultiplier sets the hard-kill
// deadline as a multiple of the soft trigger (MaxRuntime). A non-positive
// multiplier uses the kernel default.
type CriticConfig struct {
Enabled bool
BackstopMultiplier float64
}
+318
View File
@@ -0,0 +1,318 @@
package run
import (
"context"
"errors"
"fmt"
"time"
"gitea.stevedudenhoeffer.com/steve/majordomo/agent"
"gitea.stevedudenhoeffer.com/steve/majordomo/llm"
"gitea.stevedudenhoeffer.com/steve/executus/compact"
"gitea.stevedudenhoeffer.com/steve/executus/tool"
)
// ModelResolver resolves a tier alias or concrete spec to a usable llm.Model
// and an enriched context (for usage attribution). model.ParseModelForContext
// satisfies it.
type ModelResolver func(ctx context.Context, tier string) (context.Context, llm.Model, error)
// Defaults are the executor's fallback caps and loop guards, applied per run
// when the RunnableAgent leaves a field zero.
type Defaults struct {
MaxIterations int // tool-dispatch steps; default 12
MaxRuntime time.Duration // wall-clock per run; default 60s
FallbackTier string // tier when the agent's is empty; default "fast"
MaxConsecutiveToolErrors int // loop guard; default 3
MaxSameToolCallRepeats int // retry-storm guard; default 3
CompactionThresholdRatio float64 // fraction of model context to compact at; default 0.7
}
func (d Defaults) withFallbacks() Defaults {
if d.MaxIterations <= 0 {
d.MaxIterations = 12
}
if d.MaxRuntime <= 0 {
d.MaxRuntime = 60 * time.Second
}
if d.FallbackTier == "" {
d.FallbackTier = "fast"
}
if d.MaxConsecutiveToolErrors <= 0 {
d.MaxConsecutiveToolErrors = 3
}
if d.MaxSameToolCallRepeats <= 0 {
d.MaxSameToolCallRepeats = 3
}
if d.CompactionThresholdRatio <= 0 {
d.CompactionThresholdRatio = 0.7
}
return d
}
// Config wires an Executor. Registry + Models are required; everything else is
// optional and nil-safe — the zero Config beyond those yields a bounded,
// in-memory run with no persistence/audit/budget/critic/delegation/compaction
// (gadfly's case).
type Config struct {
Registry tool.Registry
Models ModelResolver
Defaults Defaults
Ports Ports
// Compactor mints the per-run context-compaction hook. nil disables
// compaction. ContextTokens resolves a tier's model context-window (for
// the compaction threshold); nil — or a zero return — also disables it.
Compactor compact.CompactorFactory
ContextTokens func(tier string) int
// SystemHeader is an optional platform header prepended to every agent's
// system prompt.
SystemHeader string
}
// Executor runs a RunnableAgent through majordomo's agent loop with the wired
// Ports. Construct with New; safe for concurrent use across runs.
type Executor struct {
cfg Config
}
// New builds an Executor. It panics if Registry or Models is nil — those are
// structural, not runtime, errors.
func New(cfg Config) *Executor {
if cfg.Registry == nil || cfg.Models == nil {
panic("run.New: Registry and Models are required")
}
cfg.Defaults = cfg.Defaults.withFallbacks()
return &Executor{cfg: cfg}
}
// Result is one run's outcome. Err carries the run failure (if any); the other
// fields are populated best-effort even on error (partial output/steps/usage).
type Result struct {
RunID string
Output string
Steps []tool.Step
Usage llm.Usage
Err error
}
// Run executes ra with the given invocation + input and returns the Result. It
// never propagates a panic; failures surface in Result.Err.
func (e *Executor) Run(ctx context.Context, ra RunnableAgent, inv tool.Invocation, input string) Result {
started := time.Now()
res := Result{RunID: inv.RunID}
tier := ra.ModelTier
if tier == "" {
tier = e.cfg.Defaults.FallbackTier
}
maxIter := ra.MaxIterations
if maxIter <= 0 {
maxIter = e.cfg.Defaults.MaxIterations
}
maxRuntime := ra.MaxRuntime
if maxRuntime <= 0 {
maxRuntime = e.cfg.Defaults.MaxRuntime
}
// Budget gate (pre-run): a rejected run makes no model call.
if e.cfg.Ports.Budget != nil {
if err := e.cfg.Ports.Budget.Check(ctx, inv.CallerID); err != nil {
res.Err = err
return res
}
}
// Resolve the model (enriches ctx for usage attribution).
modelCtx, model, err := e.cfg.Models(ctx, tier)
if err != nil {
res.Err = fmt.Errorf("resolve model %q: %w", tier, err)
return res
}
if model == nil {
// A resolver returning (ctx, nil, nil) would otherwise nil-panic inside
// the agent loop; surface it as a clean error (Run never panics out).
res.Err = fmt.Errorf("resolve model %q: resolver returned a nil model", tier)
return res
}
ctx = modelCtx
// Audit start (optional). The recorder satisfies RunTally; stamp it on the
// invocation so a self-status tool can read live progress.
var rec RunRecorder
var stateAcc *RunStateAccessor
if e.cfg.Ports.Audit != nil {
rec = e.cfg.Ports.Audit.StartRun(ctx, RunInfo{
RunID: inv.RunID,
SubjectID: ra.ID,
Name: ra.Name,
CallerID: inv.CallerID,
ChannelID: inv.ChannelID,
ParentRunID: inv.ParentRunID,
Inputs: inv.SkillInputs,
StartedAt: started,
})
}
if rec != nil {
stateAcc = NewRunStateAccessor(rec, maxIter, 0, started)
inv.RunState = stateAcc
}
// Build the toolbox from the agent's low-level tools.
toolbox, err := e.cfg.Registry.Build(ra.LowLevelTools, inv, tool.Visibility("private"), nil)
if err != nil {
res.Err = fmt.Errorf("build toolbox: %w", err)
e.finishAudit(ctx, rec, "error", res, started, res.Err)
return res
}
// Run context: bound by MaxRuntime, detached from the caller's deadline so a
// lane/queue wait doesn't eat the run budget (mort's V10 lesson). Caller
// cancellation still propagates via MergeCancellation. Created BEFORE the
// step observer so the observer forwards the merged run context (not a
// possibly-cancelled caller ctx) to OnStep consumers.
runCtx, cancel := context.WithTimeout(context.WithoutCancel(ctx), maxRuntime)
defer cancel()
runCtx, mergeCancel := MergeCancellation(runCtx, ctx)
defer mergeCancel()
// Step instrumentation: accumulate Result.Steps + fire inv.OnStep, feed the
// audit recorder, and keep the live iteration counter fresh. majordomo's
// step observer hands us each completed iteration; we zip the model's tool
// calls with their executed results PAIRWISE — a result without a matching
// call (or a call without a result) is skipped rather than recorded as an
// empty-name "ghost" step.
emitter := newStepEmitter(inv.OnStep)
stepObserver := func(s agent.Step) {
if stateAcc != nil {
stateAcc.SetIteration(s.Index)
}
if rec != nil {
rec.OnStep(s.Index, s.Response)
}
var calls []llm.ToolCall
if s.Response != nil {
calls = s.Response.ToolCalls
}
n := len(s.Results)
if len(calls) < n {
n = len(calls)
}
for i := 0; i < n; i++ {
call, r := calls[i], s.Results[i]
emitter.toolStart(runCtx, call.Name, call.Arguments)
emitter.toolEnd(runCtx, call, r.Content, r.IsError)
if rec != nil {
rec.OnTool(call, r.Content)
}
}
}
opts := []agent.Option{
agent.WithToolbox(toolbox),
agent.WithMaxSteps(maxIter),
agent.WithToolErrorLimits(e.cfg.Defaults.MaxConsecutiveToolErrors, e.cfg.Defaults.MaxSameToolCallRepeats),
agent.WithStepObserver(stepObserver),
}
if e.cfg.Compactor != nil && e.cfg.ContextTokens != nil {
if threshold := e.compactionThreshold(tier); threshold > 0 {
// Forward compaction events to the audit log (makes the
// CompactionEvent doc's "logged to the run trace" promise true).
var onFire func(compact.CompactionEvent)
if rec != nil {
onFire = func(ev compact.CompactionEvent) {
rec.LogEvent("compaction_fired", map[string]any{
"messages_before": ev.MessagesBefore,
"messages_after": ev.MessagesAfter,
"tokens_before": ev.TokensBefore,
"tokens_after": ev.TokensAfter,
})
}
}
opts = append(opts, agent.WithCompactor(e.cfg.Compactor(threshold, onFire)))
}
}
ag := agent.New(model, e.systemPrompt(ra), opts...)
runRes, runErr := ag.Run(runCtx, input)
status := statusFor(runErr)
if runRes != nil {
res.Output = runRes.Output
res.Usage = runRes.Usage
}
res.Steps = emitter.snapshot()
res.Err = runErr
e.finishAudit(ctx, rec, status, res, started, runErr)
if e.cfg.Ports.Budget != nil {
e.cfg.Ports.Budget.Commit(detach(ctx), inv.CallerID, time.Since(started).Seconds())
}
return res
}
// statusFor maps a run error to a RunStats.Status, distinguishing a deadline
// (timeout) and a cancellation (cancelled — caller cancel or shutdown) from a
// generic error so audit consumers can tell them apart.
func statusFor(runErr error) string {
switch {
case runErr == nil:
return "ok"
case errors.Is(runErr, context.DeadlineExceeded):
return "timeout"
case errors.Is(runErr, context.Canceled):
return "cancelled"
default:
return "error"
}
}
// finishAudit writes the terminal roll-up on a detached context so a cancelled
// run still records (mort's CleanupContextTimeout lesson).
func (e *Executor) finishAudit(ctx context.Context, rec RunRecorder, status string, res Result, started time.Time, runErr error) {
if rec == nil {
return
}
stats := RunStats{
Status: status,
Output: res.Output,
ToolCalls: rec.ToolCallsCount(),
RuntimeSeconds: time.Since(started).Seconds(),
}
if runErr != nil {
stats.Error = runErr.Error()
}
stats.InputTokens, stats.OutputTokens, stats.ThinkingTokens = rec.TokenStats()
rec.Close(detach(ctx), stats)
}
func (e *Executor) systemPrompt(ra RunnableAgent) string {
if e.cfg.SystemHeader == "" {
return ra.SystemPrompt
}
if ra.SystemPrompt == "" {
return e.cfg.SystemHeader
}
return e.cfg.SystemHeader + "\n\n" + ra.SystemPrompt
}
// compactionThreshold returns the token threshold for the tier's model context
// window (ratio × limit), or 0 when the limit is unknown.
func (e *Executor) compactionThreshold(tier string) int {
max := e.cfg.ContextTokens(tier)
if max <= 0 {
return 0
}
return int(float64(max) * e.cfg.Defaults.CompactionThresholdRatio)
}
// detach derives a bounded cleanup context off ctx, detached from its
// cancellation, for post-run writes. The cancel is intentionally not returned;
// CleanupContextTimeout bounds the lifetime.
func detach(ctx context.Context) context.Context {
c, cancel := context.WithTimeout(context.WithoutCancel(ctx), CleanupContextTimeout)
_ = cancel // bounded by the timeout; nothing to cancel early
return c
}
+168
View File
@@ -0,0 +1,168 @@
package run
import (
"context"
"errors"
"fmt"
"testing"
"gitea.stevedudenhoeffer.com/steve/majordomo/llm"
"gitea.stevedudenhoeffer.com/steve/majordomo/provider/fake"
"gitea.stevedudenhoeffer.com/steve/executus/tool"
)
// fakeModels returns a ModelResolver backed by a fake provider scripted to
// reply with the given text (no tool calls — the loop terminates immediately).
func fakeModels(t *testing.T, reply string) ModelResolver {
t.Helper()
fp := fake.New("fake")
fp.Enqueue("test-model", fake.Reply(reply))
m, err := fp.Model("test-model")
if err != nil {
t.Fatalf("fake model: %v", err)
}
return func(ctx context.Context, _ string) (context.Context, llm.Model, error) {
return ctx, m, nil
}
}
// TestExecutorRunHelloWorld is the milestone: executus runs an agent end-to-end
// against the fake provider and returns its output. Proves the kernel is
// runnable with the zero Ports (no persistence/audit/budget/critic).
func TestExecutorRunHelloWorld(t *testing.T) {
ex := New(Config{
Registry: tool.NewRegistry(),
Models: fakeModels(t, "hello from executus"),
})
res := ex.Run(context.Background(),
RunnableAgent{Name: "greeter", SystemPrompt: "be brief", ModelTier: "test-model"},
tool.Invocation{RunID: "run-1", CallerID: "caller-1"},
"say hi")
if res.Err != nil {
t.Fatalf("run error: %v", res.Err)
}
if res.Output != "hello from executus" {
t.Fatalf("output = %q, want %q", res.Output, "hello from executus")
}
if res.RunID != "run-1" {
t.Errorf("RunID = %q, want run-1", res.RunID)
}
}
// TestExecutorBudgetRejection: a Budget that denies makes no model call.
func TestExecutorBudgetRejection(t *testing.T) {
denied := errors.New("over budget")
var modelCalled bool
models := func(ctx context.Context, _ string) (context.Context, llm.Model, error) {
modelCalled = true
return ctx, nil, nil
}
ex := New(Config{
Registry: tool.NewRegistry(),
Models: models,
Ports: Ports{Budget: budgetFunc{check: func(string) error { return denied }}},
})
res := ex.Run(context.Background(),
RunnableAgent{ModelTier: "test-model"},
tool.Invocation{RunID: "r", CallerID: "broke"}, "hi")
if !errors.Is(res.Err, denied) {
t.Fatalf("err = %v, want budget denial", res.Err)
}
if modelCalled {
t.Error("model must not be resolved/called when budget denies")
}
}
// TestExecutorAuditWiring: the Audit port receives StartRun + Close with the
// terminal status/output.
func TestExecutorAuditWiring(t *testing.T) {
rec := &captureRecorder{}
ex := New(Config{
Registry: tool.NewRegistry(),
Models: fakeModels(t, "done"),
Ports: Ports{Audit: auditFunc{start: func(RunInfo) RunRecorder { return rec }}},
})
res := ex.Run(context.Background(),
RunnableAgent{ModelTier: "test-model"},
tool.Invocation{RunID: "r2", CallerID: "c"}, "go")
if res.Err != nil {
t.Fatalf("run error: %v", res.Err)
}
if !rec.closed {
t.Fatal("recorder.Close was not called")
}
if rec.stats.Status != "ok" {
t.Errorf("close status = %q, want ok", rec.stats.Status)
}
if rec.stats.Output != "done" {
t.Errorf("close output = %q, want done", rec.stats.Output)
}
}
// --- test doubles ---
type budgetFunc struct{ check func(callerID string) error }
func (b budgetFunc) Check(_ context.Context, callerID string) error { return b.check(callerID) }
func (b budgetFunc) Commit(context.Context, string, float64) {}
type auditFunc struct{ start func(RunInfo) RunRecorder }
func (a auditFunc) StartRun(_ context.Context, info RunInfo) RunRecorder { return a.start(info) }
type captureRecorder struct {
closed bool
stats RunStats
steps int
tools int
}
func (r *captureRecorder) TokenStats() (in, out, thinking int64) { return 0, 0, 0 }
func (r *captureRecorder) ToolCallsCount() int { return r.tools }
func (r *captureRecorder) OnStep(int, *llm.Response) { r.steps++ }
func (r *captureRecorder) OnTool(llm.ToolCall, string) { r.tools++ }
func (r *captureRecorder) LogEvent(string, map[string]any) {}
func (r *captureRecorder) LogError(string) {}
func (r *captureRecorder) Close(_ context.Context, s RunStats) { r.closed = true; r.stats = s }
// TestExecutorNilModelNoPanic: a resolver returning (ctx, nil, nil) yields a
// clean error, not a nil-pointer panic (gadfly F1, high severity).
func TestExecutorNilModelNoPanic(t *testing.T) {
ex := New(Config{
Registry: tool.NewRegistry(),
Models: func(ctx context.Context, _ string) (context.Context, llm.Model, error) {
return ctx, nil, nil // nil model, nil error
},
})
res := ex.Run(context.Background(),
RunnableAgent{ModelTier: "x"}, tool.Invocation{RunID: "r"}, "hi")
if res.Err == nil {
t.Fatal("expected an error for a nil model, got nil (would have panicked in the loop)")
}
}
// TestStatusFor maps run errors to RunStats.Status (gadfly F3).
func TestStatusFor(t *testing.T) {
cases := []struct {
err error
want string
}{
{nil, "ok"},
{context.DeadlineExceeded, "timeout"},
{context.Canceled, "cancelled"},
{fmt.Errorf("wrapped: %w", context.DeadlineExceeded), "timeout"},
{errors.New("boom"), "error"},
}
for _, c := range cases {
if got := statusFor(c.err); got != c.want {
t.Errorf("statusFor(%v) = %q, want %q", c.err, got, c.want)
}
}
}
+168
View File
@@ -0,0 +1,168 @@
package run
import (
"context"
"time"
"gitea.stevedudenhoeffer.com/steve/majordomo/llm"
"gitea.stevedudenhoeffer.com/steve/executus/deliver"
)
// Ports are the host seams the run executor consumes. Every field is nil-safe:
// a light host passes the zero Ports and gets a bounded, in-memory run with no
// persistence, audit, budget, critic, delegation, or delivery — which is
// exactly a gadfly swarm task. A heavy host (mort) wires each one to a battery.
//
// This struct IS the inversion: in mort, agentexec imports agents /
// agentcritic / skillaudit and skillexec imports skills / paste directly; here
// the kernel depends only on these interfaces, and the batteries implement
// them. The mort_*_adapters.go wall becomes the set of impls.
type Ports struct {
// Audit records the run trace (start, per-step/per-tool events, final
// stats). nil = no audit.
Audit Audit
// Budget gates and meters per-caller resource use. nil = unbounded.
Budget Budget
// Critic optionally monitors a long run for hangs/runaways. nil = none.
Critic Critic
// Checkpointer persists resumable progress for durable recovery. nil = no
// checkpointing (a run interrupted by shutdown is simply lost).
Checkpointer Checkpointer
// Palette resolves SkillPalette / SubAgentPalette entries into delegation
// tools (skill__<name> / agent__<name>). nil = those entries are inert.
Palette PaletteSource
// Delivery is where the run's output + artifacts go. nil = the caller
// reads the Result in-process (the light-host default).
Delivery deliver.Delivery
}
// RunInfo describes a run at start time — the attribution a recorder/critic
// needs. Host-neutral rename of mort's SkillRun start fields.
type RunInfo struct {
RunID string
SubjectID string // the agent/skill id being run (audit "skill_id")
Name string
CallerID string
ChannelID string
ParentRunID string
Inputs map[string]any
StartedAt time.Time
}
// RunStats is the terminal roll-up a recorder's Close writes. Mirrors mort's
// skillaudit/skillexec RunStats.
type RunStats struct {
Status string // ok | error | timeout | budget_exceeded | cancelled | dry_run
Output string
Error string
ToolCalls int
RuntimeSeconds float64
InputTokens int64
OutputTokens int64
ThinkingTokens int64
}
// --- Audit ---
// Audit begins recording a run. StartRun returns a per-run RunRecorder (or nil
// to skip recording this run). The audit battery wires its Storage behind this.
type Audit interface {
StartRun(ctx context.Context, info RunInfo) RunRecorder
}
// RunRecorder records the events of one in-flight run and its final stats. It
// satisfies RunTally so the kernel can surface live token/tool counts to the
// self-status tool. Mirrors mort's skillaudit.Writer.
type RunRecorder interface {
RunTally
// OnStep records one completed agent-loop iteration's model response.
OnStep(iter int, resp *llm.Response)
// OnTool records one executed tool call + its result.
OnTool(call llm.ToolCall, result string)
// LogEvent / LogError append structured events to the run log.
LogEvent(eventType string, payload map[string]any)
LogError(msg string)
// Close writes the terminal roll-up. Detaches from the caller's context
// internally so a cancelled run still records.
Close(ctx context.Context, stats RunStats)
}
// --- Budget ---
// Budget gates and meters per-caller resource use. Mirrors mort's
// skillexec.BudgetTracker.
type Budget interface {
// Check reports whether the caller has remaining budget (nil = allowed).
Check(ctx context.Context, callerID string) error
// Commit records that the caller spent runtimeSeconds on this run.
Commit(ctx context.Context, callerID string, runtimeSeconds float64)
}
// --- Critic ---
// Critic optionally monitors a long-running run (the two-tier soft/hard
// timeout). Monitor returns a handle the executor feeds progress into and
// queries for steer/deadline decisions; a nil handle means "not monitored".
//
// The exact wiring (how the handle's Steer/Deadline bind into majordomo's
// agent.WithSteer / agent.WithMaxStepsFunc / run-context cancellation) is
// finalized in the executor; this is the seam the agentcritic battery adapts.
type Critic interface {
Monitor(ctx context.Context, info RunInfo, softTimeout time.Duration) CriticHandle
}
// CriticHandle is the executor's live link to a run's critic.
type CriticHandle interface {
// RecordStep / RecordToolStart keep the critic's activity clock fresh so a
// healthy-but-slow run is not mistaken for a hang.
RecordStep(iter int)
RecordToolStart(name, args string)
// Steer returns any messages the critic wants injected into the loop (a
// nudge), drained before each step — matches majordomo agent.WithSteer.
Steer() []llm.Message
// Deadline returns the current hard-kill deadline (the critic may extend
// it); the executor binds the run context to it. Zero = no hard deadline.
Deadline() time.Time
// Stop ends monitoring when the run finishes.
Stop()
}
// --- Checkpointer ---
// Checkpointer persists a run's resumable progress for durable recovery.
// Mirrors mort's agentexec.RunCheckpointer.
type Checkpointer interface {
// Save persists the run's current resumable progress (throttled).
Save(ctx context.Context, st RunCheckpointState) error
// Complete clears the checkpoint on success.
Complete(ctx context.Context) error
// Fail clears the checkpoint on terminal failure. A run interrupted by
// shutdown is left untouched so boot recovery picks it up.
Fail(ctx context.Context, err error) error
}
// RunCheckpointState is the resumable snapshot a Checkpointer persists. Kept
// minimal here; the executor extends what it records during the merge.
type RunCheckpointState struct {
Messages []llm.Message
Iteration int
}
// --- PaletteSource ---
// PaletteSource resolves a RunnableAgent's SkillPalette / SubAgentPalette names
// into delegation tools and invokes them. Mirrors mort's
// SkillInvokerForPalette + AgentInvokerForPalette. nil Palette => palette
// entries are inert ("not configured" at first call).
type PaletteSource interface {
ResolveSkill(ctx context.Context, callerID, name string) (skillID string, err error)
InvokeSkill(ctx context.Context, callerID, channelID, name string,
inputs map[string]any, parentRunID string) (output, runID, status string, err error)
ResolveAgent(ctx context.Context, callerID, name string) (agentID string, err error)
InvokeAgent(ctx context.Context, callerID, channelID, name string,
prompt, parentRunID, modelTierOverride, promptPrepend string,
toolsSubset []string,
onEvent func(ctx context.Context, event, emoji string)) (output, runID, status string, err error)
}
+157
View File
@@ -0,0 +1,157 @@
// Package run is executus's run kernel: the shared run-loop mechanics around
// majordomo's agent loop, plus the host seams (run.Ports / RunnableAgent) that
// let one executor serve every surface — a light host's bounded one-shot run,
// a heavy host's persona agent or saved skill — without the kernel importing a
// battery.
//
// This file holds the genuinely-identical scaffolding both run shapes need:
// context cancellation merging, the detached-cleanup timeout, the per-run
// progress accessor the self-status tool reads, the legacy `submit`
// compatibility tool (submit.go), the ancestor progress bridge (progress.go),
// and the run-finalizer machinery — one source of truth.
//
// The kernel depends only on majordomo + executus/tool + the run.Ports
// interfaces; persistence, audit, the persona/skill nouns, and the critic are
// host-supplied via Ports (see ports.go) so importing the kernel never drags in
// a store or a battery.
package run
import (
"context"
"errors"
"log/slog"
"sync/atomic"
"time"
"gitea.stevedudenhoeffer.com/steve/executus/tool"
)
// ErrShutdown is the cancellation cause set on mort's base lifecycle context
// when the process is shutting down (SIGTERM after the drain window). The
// agent executor uses it to distinguish a run interrupted by shutdown (which
// should be left durable-recoverable) from a run that errored or hit its own
// deadline (terminal).
var ErrShutdown = errors.New("mort: shutting down")
// CleanupContextTimeout caps how long a run's post-completion cleanup ops
// (budget commit, audit Close, attachment bookkeeping) may wait on
// storage after detaching from the caller's — possibly already
// cancelled — context. 10s is generous for a single-row UPDATE against
// MySQL; longer suggests a hung connection the run goroutine shouldn't
// keep waiting on. Both executors derive their cleanup contexts as
// context.WithTimeout(context.WithoutCancel(ctx), CleanupContextTimeout).
const CleanupContextTimeout = 10 * time.Second
// Reserved state-react lifecycle event keys, shared so both nouns surface
// the same UX shape. Namespaced with double-underscores to make accidental
// collision with a tool name near-impossible.
const (
StateReactStart = "__start__"
StateReactEnd = "__end__"
StateReactError = "__error__"
StateReactBudgetExceeded = "__budget_exceeded__"
)
// MergeCancellation returns a context cancelled when EITHER input is
// cancelled, propagating the cancellation Cause from whichever fired. Used
// by the lane preemption path (the lane's per-job ctx.Cause flows into the
// run context) and by the runtime-detach path (process shutdown still
// reaches a run whose deadline was reset after a lane wait). Always call
// the returned cancel to release the watcher goroutine; it is also invoked
// once when either input fires.
func MergeCancellation(parent, secondary context.Context) (context.Context, context.CancelFunc) {
merged, cancel := context.WithCancelCause(parent)
go func() {
select {
case <-merged.Done():
return
case <-secondary.Done():
cancel(context.Cause(secondary))
}
}()
return merged, func() { cancel(nil) }
}
// RunFinalizer is invoked at run finish so per-run tool state (open HTTP
// streams, per-run code_exec counters, per-run search budgets) is released
// and the process-lifetime maps keyed by run id don't grow unbounded.
// Both executors fire their registered finalizers via FireFinalizers.
type RunFinalizer interface {
FinalizeRun(runID string)
}
// FireFinalizers runs every finalizer for runID, isolating each behind a
// panic-recover so one buggy finalizer can't take down the run goroutine
// or skip the others. Safe to call with a nil/empty slice.
func FireFinalizers(fs []RunFinalizer, runID string) {
for _, f := range fs {
if f == nil {
continue
}
func() {
defer func() {
if r := recover(); r != nil {
slog.Error("runengine: run finalizer panicked",
"run_id", runID, "panic", r)
}
}()
f.FinalizeRun(runID)
}()
}
}
// RunTally is the narrow live-progress source the RunStateAccessor reads —
// the running token and tool-call counts for the in-flight run. The audit
// battery's writer satisfies it; this interface is how the run kernel reads
// live tallies without importing the audit package (the inversion of mort's
// direct *skillaudit.Writer dependency).
type RunTally interface {
// TokenStats returns the running input, output, and thinking token totals.
TokenStats() (in, out, thinking int64)
// ToolCallsCount returns the number of tool calls executed so far.
ToolCallsCount() int
}
// RunStateAccessor is the per-run live-progress accessor the executor
// stamps on Invocation.RunState before building the toolbox, so the
// self-status tool can report iteration / tool-calls / tokens / elapsed for
// the in-flight run. Construct with NewRunStateAccessor; the executor's step
// observer calls SetIteration each loop.
type RunStateAccessor struct {
tally RunTally
iter atomic.Int32
maxIter int
maxCalls int
startedAt time.Time
}
// NewRunStateAccessor builds the accessor. writer supplies the live token
// + tool-call tallies; maxIter / maxCalls are the reported caps (0 =
// uncapped); startedAt anchors the elapsed clock.
func NewRunStateAccessor(tally RunTally, maxIter, maxCalls int, startedAt time.Time) *RunStateAccessor {
return &RunStateAccessor{
tally: tally,
maxIter: maxIter,
maxCalls: maxCalls,
startedAt: startedAt,
}
}
// SetIteration records the current agent-loop iteration (called from the
// executor's step observer).
func (a *RunStateAccessor) SetIteration(iter int) { a.iter.Store(int32(iter)) }
// RunState satisfies tool.RunStateAccessor.
func (a *RunStateAccessor) RunState() tool.RunState {
in, out, think := a.tally.TokenStats()
return tool.RunState{
Iteration: int(a.iter.Load()),
MaxIterations: a.maxIter,
ToolCalls: a.tally.ToolCallsCount(),
MaxToolCalls: a.maxCalls,
InputTokens: in,
OutputTokens: out,
ThinkingTokens: think,
ElapsedSeconds: int(time.Since(a.startedAt).Seconds()),
}
}
+419
View File
@@ -0,0 +1,419 @@
package run
// steps.go — the per-run step emitter and the tool→step presentation
// mapping. This is the single place that turns the executor's post-step
// observer into ordered tool.Step records: one per tool call, each with a
// stable id, an open-vocabulary kind, and a human present-tense summary
// that flips running→complete/error.
//
// One source feeds two consumers (mirroring the OnEvent/OnToolEvent/
// PostRunResult pattern): the live tool.Invocation.OnStep callback
// (nil-safe) AND snapshot(), which the executor copies onto Result.Steps.
// Because the Result accumulation does not depend on OnStep being set,
// every surface — chat (JSON + SSE), Discord, cron, sub-agents — carries
// steps; OnStep is needed only for live streaming.
//
// LIMITATION (current): majordomo exposes only a POST-step observer, so
// the executor calls toolStart+toolEnd back-to-back after each tool has
// already run. Steps are therefore recorded faithfully, but step.StartedAt
// ≈ EndedAt and the intermediate "running" phase is never observable to a
// live OnStep consumer. A pre-dispatch hook (wrapping each tool's handler
// to emit toolStart before execution, like mort's state-react decorator)
// is a follow-up that would restore real start timing + the running phase.
// The emitter already supports that two-call shape — toolStart and toolEnd
// are separate methods — so wiring it later is additive.
import (
"context"
"encoding/json"
"fmt"
"net/url"
"strings"
"time"
"gitea.stevedudenhoeffer.com/steve/majordomo/llm"
"gitea.stevedudenhoeffer.com/steve/executus/tool"
)
// stepSummaryMaxLen caps the human summary length (section G size cap).
// Detail is unused in v1 (no live detail source while replies are
// generated blocking) so there is no Detail cap yet.
const stepSummaryMaxLen = 200
// stepEmitter accumulates ordered steps for one run and fires the live
// OnStep callback.
//
// Concurrency: touched ONLY from the agent-loop goroutine — the executor's
// stepObserver (and, once a pre-dispatch hook is wired, that hook) run
// there; majordomo executes a step's tool calls sequentially, and
// sub-agents build their own Invocation so they never reach this
// emitter. Same single-goroutine contract as the audit Writer and the
// critic ProgressRecorder — no internal lock.
type stepEmitter struct {
onStep func(ctx context.Context, ev tool.StepEvent)
now func() time.Time
seq int
steps []tool.Step // ordered; the snapshot for Result.Steps
byID map[string]int // step id -> index into steps
pending map[string][]string // correlation key -> queued running ids (FIFO)
}
// newStepEmitter returns an emitter that forwards to onStep (nil-safe).
func newStepEmitter(onStep func(ctx context.Context, ev tool.StepEvent)) *stepEmitter {
return &stepEmitter{
onStep: onStep,
now: time.Now,
byID: map[string]int{},
pending: map[string][]string{},
}
}
// corrKey correlates a "start" (name + raw args, no call id available at
// the pre-dispatch hook) with its later "end" (the stepObserver has the
// full call incl. id + the same raw args).
func corrKey(name string, args json.RawMessage) string {
return name + "\x00" + string(args)
}
// toolStart records + emits the "start" of a tool call. Called from the
// pre-dispatch hookToolbox closure, before the tool runs.
func (e *stepEmitter) toolStart(ctx context.Context, name string, args json.RawMessage) {
if e == nil {
return
}
step := e.newStep(name, args)
key := corrKey(name, args)
e.pending[key] = append(e.pending[key], step.ID)
e.fire(ctx, "start", step)
}
// toolEnd records + emits the terminal "end" of a tool call. Called from
// the stepObserver for each completed tool call. If no matching start was
// seen (e.g. a tool with a nil handler the pre-dispatch hook skipped), a
// start is synthesized so the step still appears.
func (e *stepEmitter) toolEnd(ctx context.Context, call llm.ToolCall, result string, isError bool) {
if e == nil {
return
}
id := e.matchPending(call.Name, call.Arguments)
if id == "" {
id = e.newStep(call.Name, call.Arguments).ID
}
idx, ok := e.byID[id]
if !ok {
return
}
step := &e.steps[idx]
end := e.now()
step.EndedAt = &end
if isError {
step.Status = "error"
} else {
step.Status = "complete"
}
if s := summaryForEnd(call.Name, call.Arguments, result, isError); s != "" {
step.Summary = s
}
e.fire(ctx, "end", *step)
}
// newStep mints + appends a running step and returns it (by value).
func (e *stepEmitter) newStep(name string, args json.RawMessage) tool.Step {
e.seq++
step := tool.Step{
ID: fmt.Sprintf("s%d", e.seq),
Kind: kindForTool(name),
Title: name,
Summary: summaryForStart(name, args),
Status: "running",
StartedAt: e.now(),
}
e.byID[step.ID] = len(e.steps)
e.steps = append(e.steps, step)
return step
}
// matchPending pops the oldest running step id for (name, args). Falls back to
// the OLDEST still-running step of the same tool name when the args don't
// byte-match between start and end (e.g. JSON key reordering). FIFO on the
// fallback too, consistent with the per-key queue pop above — closing the
// oldest avoids mis-correlating concurrent same-named calls. Returns "" on no
// match.
func (e *stepEmitter) matchPending(name string, args json.RawMessage) string {
key := corrKey(name, args)
if q := e.pending[key]; len(q) > 0 {
id := q[0]
if len(q) == 1 {
delete(e.pending, key)
} else {
e.pending[key] = q[1:]
}
return id
}
for i := 0; i < len(e.steps); i++ {
if e.steps[i].Title == name && e.steps[i].Status == "running" {
return e.steps[i].ID
}
}
return ""
}
func (e *stepEmitter) fire(ctx context.Context, phase string, step tool.Step) {
if e.onStep == nil {
return
}
e.onStep(ctx, tool.StepEvent{Phase: phase, Step: step})
}
// snapshot returns a copy of the ordered, deduplicated step set for the
// run Result. A step still "running" at run end (e.g. the run was
// cancelled mid-tool-call) is reported as-is.
func (e *stepEmitter) snapshot() []tool.Step {
if e == nil || len(e.steps) == 0 {
return nil
}
out := make([]tool.Step, len(e.steps))
copy(out, e.steps)
return out
}
// kindForTool maps a tool name to an open-vocabulary step kind. Unknown
// tools fall back to "tool" — never an error, just a generic step (the
// client maps unknown kinds to a default icon). Loosely tracks the
// catalog in pkg/skilltools/CLAUDE.md.
func kindForTool(name string) string {
switch name {
case "web_search", "search_reddit", "wikipedia_summary":
return "search"
case "read_page", "read_pdf", "read_reddit", "read_video", "verify_url",
"summary_summarise", "summarize", "file_get_text", "file_get_metadata",
"http_get", "http_post", "http_get_stream", "http_stream_read":
return "read"
case "code_exec", "calculate":
return "code"
case "file_save", "file_get", "file_list", "file_delete", "file_search":
return "file"
case "kv_get", "kv_set", "kv_list", "kv_delete",
"remember", "recall", "chatbot_get_memories":
return "memory"
case "query", "query_research", "deepresearch", "animate",
"agent_invoke", "agent_invoke_parallel", "agent_spawn",
"agent_spawn_parallel", "skill_invoke", "skill_invoke_parallel":
return "delegate"
case "think":
return "thinking"
default:
switch {
case strings.HasPrefix(name, "image") || strings.Contains(name, "draw"):
return "image"
default:
return "tool"
}
}
}
// summaryForStart builds the human present-tense running summary. It
// derives specifics from safe arg fields only; secret-bearing tools
// (mcp_call, email_send, http_*) are summarized without echoing args.
func summaryForStart(name string, args json.RawMessage) string {
var s string
switch name {
case "web_search":
if q := argString(args, "query", "q"); q != "" {
s = fmt.Sprintf("Searching the web for %q", q)
} else {
s = "Searching the web"
}
case "search_reddit":
if q := argString(args, "query", "q"); q != "" {
s = fmt.Sprintf("Searching Reddit for %q", q)
} else {
s = "Searching Reddit"
}
case "wikipedia_summary":
if q := argString(args, "query", "title"); q != "" {
s = fmt.Sprintf("Looking up %q on Wikipedia", q)
} else {
s = "Looking up Wikipedia"
}
case "read_page", "read_pdf", "read_reddit", "read_video", "verify_url":
if u := argString(args, "url", "post", "page"); u != "" {
s = "Reading " + hostOf(u)
} else {
s = "Reading a page"
}
case "http_get", "http_post", "http_get_stream":
// Show host only — a full URL can embed credentials/tokens.
if u := argString(args, "url"); u != "" {
s = "Fetching " + hostOf(u)
} else {
s = "Making an HTTP request"
}
case "summary_summarise", "summarize":
s = "Summarizing text"
case "translate":
if lang := argString(args, "target_lang", "target_language", "lang"); lang != "" {
s = "Translating to " + lang
} else {
s = "Translating text"
}
case "code_exec":
s = "Running code"
case "calculate":
if q := argString(args, "query", "expression", "expr"); q != "" {
s = "Calculating " + truncateStep(q, 60)
} else {
s = "Calculating"
}
case "remember":
// Never echo the stored value.
s = "Saving a memory"
case "recall", "chatbot_get_memories":
s = "Recalling memories"
case "kv_get", "kv_list":
s = "Reading saved data"
case "kv_set":
s = "Saving data"
case "kv_delete":
s = "Deleting saved data"
case "file_save":
if n := argString(args, "name", "filename"); n != "" {
s = "Saving file " + truncateStep(n, 60)
} else {
s = "Saving a file"
}
case "file_get", "file_get_text", "file_get_metadata":
s = "Reading a file"
case "file_list", "file_search":
s = "Listing files"
case "query", "query_research":
if q := argString(args, "query", "question", "prompt", "task"); q != "" {
s = "Researching " + truncateStep(q, 80)
} else {
s = "Researching"
}
case "deepresearch":
s = "Running deep research"
case "animate":
s = "Generating an animation"
case "agent_invoke", "agent_spawn":
if a := argString(args, "agent", "agent_name", "name"); a != "" {
s = "Delegating to " + a
} else {
s = "Delegating to a sub-agent"
}
case "agent_invoke_parallel", "agent_spawn_parallel":
s = "Delegating to sub-agents"
case "skill_invoke":
if sk := argString(args, "skill_name", "skill", "name"); sk != "" {
s = "Running skill " + sk
} else {
s = "Running a skill"
}
case "skill_invoke_parallel":
s = "Running skills in parallel"
case "think":
s = "Thinking"
case "mcp_call":
// Redact: MCP args frequently carry secrets. Name server/tool only.
srv, tl := argString(args, "server"), argString(args, "tool")
switch {
case srv != "" && tl != "":
s = fmt.Sprintf("Calling %s/%s", srv, tl)
case srv != "":
s = "Calling " + srv
default:
s = "Calling an MCP tool"
}
case "email_send":
// Redact recipients + body.
s = "Sending an email"
default:
s = "Using " + name
}
return truncateStep(s, stepSummaryMaxLen)
}
// summaryForEnd optionally upgrades the summary to a cheap result phrase.
// Returns "" to keep the running summary (the caller then just flips the
// status). Never returns a phrase derived from raw result bytes.
func summaryForEnd(name string, _ json.RawMessage, result string, isError bool) string {
if isError {
return ""
}
switch name {
case "web_search", "search_reddit":
if n := countResults(result); n >= 0 {
return fmt.Sprintf("Found %d result%s", n, plural(n))
}
}
return ""
}
// argString pulls the first present non-empty string field from a tool's
// raw JSON args, trying keys in order. Returns "" when none parse.
func argString(args json.RawMessage, keys ...string) string {
if len(args) == 0 {
return ""
}
var m map[string]any
if err := json.Unmarshal(args, &m); err != nil {
return ""
}
for _, k := range keys {
if v, ok := m[k]; ok {
if s, ok := v.(string); ok && strings.TrimSpace(s) != "" {
return strings.TrimSpace(s)
}
}
}
return ""
}
// countResults parses a v11-style {"results":[...]} envelope and returns
// the count, or -1 when the shape doesn't match.
func countResults(result string) int {
if strings.TrimSpace(result) == "" {
return -1
}
var env struct {
Results []json.RawMessage `json:"results"`
}
if err := json.Unmarshal([]byte(result), &env); err != nil || env.Results == nil {
return -1
}
return len(env.Results)
}
// hostOf returns the bare host (no leading www.) of a URL, or a short
// form of the raw string when it doesn't parse as a URL.
func hostOf(raw string) string {
if u, err := url.Parse(raw); err == nil && u.Host != "" {
return strings.TrimPrefix(u.Host, "www.")
}
return truncateStep(raw, 60)
}
// truncateStep rune-safely caps s to max, appending an ellipsis when cut.
func truncateStep(s string, max int) string {
if max <= 0 {
return ""
}
r := []rune(s)
if len(r) <= max {
return s
}
if max == 1 {
return string(r[:1])
}
return string(r[:max-1]) + "…"
}
func plural(n int) string {
if n == 1 {
return ""
}
return "s"
}
+84
View File
@@ -0,0 +1,84 @@
package run
import (
"context"
"strings"
"sync"
llm "gitea.stevedudenhoeffer.com/steve/majordomo/llm"
)
// SubmitCapture records the output a run's `submit` tool received.
//
// Why this exists: legacy agentkit injected a synthetic `submit` tool and
// ended the loop when it fired; years of mort system prompts (agent
// YAMLs, skill manifests, the executors' platform headers) teach the
// model to "call submit with your final answer". majordomo's agent loop
// has no submit concept — it ends when the model replies WITHOUT tool
// calls. Dropping submit cold would make every prompt-trained model
// burn turns on "unknown tool \"submit\"" errors.
//
// The compatibility shape: the executors add NewSubmitTool's tool to
// every run's toolset (unless the palette already defines a `submit`).
// The handler records the FIRST submitted answer and tells the model
// the answer was accepted so its next turn is a bare reply (which ends
// the loop naturally). After the run, the executor consults
// Output(loopOutput, runErr): a captured submission wins over an empty
// or budget-exhausted ending, so a model that submits on its final
// allowed step still produces its answer instead of ErrMaxSteps.
type SubmitCapture struct {
mu sync.Mutex
output string
called bool
}
// Record stores the first submitted answer; later calls are ignored
// (matching legacy agentkit's "multiple calls keep the first" contract).
func (c *SubmitCapture) Record(output string) {
c.mu.Lock()
defer c.mu.Unlock()
if c.called {
return
}
c.called = true
c.output = output
}
// Submitted returns the captured answer and whether submit fired.
func (c *SubmitCapture) Submitted() (string, bool) {
c.mu.Lock()
defer c.mu.Unlock()
return c.output, c.called
}
// Output resolves the run's final output: the submitted answer when the
// model called submit (parity with legacy agentkit, where submit's argument
// WAS the run output), otherwise the loop's own final text. resolvedErr
// is nil when a submission exists — a run that submitted its answer and
// then ran out of steps (or timed out composing the courtesy
// confirmation turn) is a SUCCESS, not an error.
func (c *SubmitCapture) Output(loopOutput string, runErr error) (output string, resolvedErr error) {
if out, ok := c.Submitted(); ok {
return out, nil
}
return loopOutput, runErr
}
// submitArgs mirrors legacy agentkit's synthetic submit tool schema so
// models prompted under the old contract emit compatible calls.
type submitArgs struct {
Output string `json:"output" description:"The final answer, summary, or output for this task."`
}
// NewSubmitTool builds the compatibility `submit` tool bound to the
// given capture. Both executors (skill + agent) install one per run.
func NewSubmitTool(capture *SubmitCapture) llm.Tool {
return llm.DefineTool[submitArgs](
"submit",
"Submit your final answer or output to end this task. Call exactly once when you are done.",
func(_ context.Context, args submitArgs) (any, error) {
capture.Record(strings.TrimSpace(args.Output))
return "Final answer recorded. Do not call any more tools; reply now with a brief closing message.", nil
},
)
}