P2: run kernel + run.Ports inversion — executus is runnable #2
@@ -1,10 +1,11 @@
|
||||
# Gadfly — agentic adversarial PR reviewer (https://gitea.stevedudenhoeffer.com/steve/gadfly).
|
||||
#
|
||||
# Runs the published Gadfly image (pinned to :v1) as a specialist swarm and posts
|
||||
# Runs the published Gadfly image (pinned to an immutable :sha- tag — act_runner
|
||||
# caches :latest, and this build is what carries foreman provider-type support)
|
||||
# as a specialist swarm and posts
|
||||
# ONE consolidated review comment as gitea-actions. Advisory only — never blocks a
|
||||
# merge. This reviews executus PRs (same setup as mort: m1/m5 locals + 2 cloud,
|
||||
# 3-lens suite). Gadfly is a simple system — treat its findings as advisory and
|
||||
# double-check before acting.
|
||||
# merge. This reviews executus PRs with 3 ollama-cloud models (3-lens suite). Gadfly
|
||||
# is a simple system — findings are advisory; always double-check before acting.
|
||||
|
||||
name: Adversarial Review (Gadfly)
|
||||
|
||||
@@ -40,24 +41,21 @@ jobs:
|
||||
|| github.actor == 'fizi'
|
||||
|| github.actor == 'dazed'))
|
||||
runs-on: ubuntu-latest
|
||||
# Full fleet (2 cloud + 2 local Macs, all running concurrently) reviewing
|
||||
# every PR with the 3-lens suite — the slow local lanes dominate wall time.
|
||||
timeout-minutes: 90
|
||||
# 3 cloud models, all concurrent, 3-lens suite. ~12 min typical.
|
||||
timeout-minutes: 30
|
||||
steps:
|
||||
- uses: docker://gitea.stevedudenhoeffer.com/steve/gadfly:v1
|
||||
- uses: docker://gitea.stevedudenhoeffer.com/steve/gadfly:sha-6e3a83c
|
||||
env:
|
||||
GITEA_API: ${{ github.server_url }}/api/v1/repos/${{ github.repository }}
|
||||
GITEA_TOKEN: ${{ secrets.GITEA_TOKEN }}
|
||||
OLLAMA_CLOUD_API_KEY: ${{ secrets.OLLAMA_CLOUD_API_KEY }}
|
||||
# Local Ollama boxes (each its own lane, cap 1). NOTE: both Macs must be
|
||||
# awake/reachable for their reviews to run; if a box is offline, that
|
||||
# model's comment shows an error and the others still post.
|
||||
GADFLY_ENDPOINT_M1PRO: "ollama|http://192.168.0.175:11434"
|
||||
GADFLY_ENDPOINT_M5MAX: "ollama|http://192.168.0.173:11434"
|
||||
# 2 cloud (parallel) + M1 Pro + M5 Max — one consolidated comment each.
|
||||
GADFLY_MODELS: "minimax-m3:cloud,deepseek-v4-flash:cloud,m1pro/qwen3:14b,m5max/qwen3.6:35b-mlx"
|
||||
# cloud runs 2 at once; each Mac one at a time; all three lanes parallel.
|
||||
GADFLY_PROVIDER_CONCURRENCY: "ollama-cloud=2,m1pro=1,m5max=1"
|
||||
# executus uses CLOUD MODELS ONLY. The local Macs (m1/m5) were dropped:
|
||||
# on a P2-review measurement they took 26–29 min (with lens timeouts)
|
||||
# and contributed ZERO real findings — the two cloud models found every
|
||||
# genuine bug in 6–12 min. Cloud-only is faster AND higher-signal.
|
||||
# 3 cloud models, one consolidated comment each, all run in parallel.
|
||||
GADFLY_MODELS: "minimax-m3:cloud,deepseek-v4-flash:cloud,glm-5.2:cloud"
|
||||
GADFLY_PROVIDER_CONCURRENCY: "ollama-cloud=3"
|
||||
# Default => the 3-lens suite (security, correctness, error-handling).
|
||||
# Set the repo var GADFLY_SPECIALISTS to override (csv / "all" / "auto").
|
||||
GADFLY_SPECIALISTS: ${{ vars.GADFLY_SPECIALISTS || 'security,correctness,error-handling' }}
|
||||
|
||||
@@ -43,8 +43,13 @@ CORE (majordomo + stdlib):
|
||||
fanout/ programmatic N×M swarm [P0 ✓]
|
||||
deliver/ output egress seam (+ Discard/Stdout) [P0 ✓]
|
||||
identity/ caller identity seams [P0 ✓]
|
||||
run/ progress bridge now; the executor kernel + [P0 partial]
|
||||
nil-safe Ports + RunnableAgent later [P2]
|
||||
run/ run.Executor is RUNNABLE: model-resolve + [P2 core ✓]
|
||||
toolbox + majordomo loop + compaction +
|
||||
run-bounding (V10 detached timeout) + step/
|
||||
audit observers + Budget gate; RunnableAgent
|
||||
DTO + nil-safe run.Ports. Follow-ups: wire
|
||||
Critic/Checkpointer/PaletteSource/Delivery,
|
||||
Phases, and the no-tools direct path [P2]
|
||||
dispatchguard/ loop/depth/fan-out caps [P0 ✓]
|
||||
pendingattach/ attachment dedupe [P0 ✓]
|
||||
tool/ registry + 3-stage permissions + ssrf [P1 ✓]
|
||||
@@ -52,7 +57,7 @@ CORE (majordomo + stdlib):
|
||||
(convar->config.Source; UsageSink/TraceSink seams; GenerateWith[T]
|
||||
structured output — no separate structured/ pkg)
|
||||
llmmeta/ shared meta-LLM helper over model/ [P1 ✓]
|
||||
compact/ context compactor (WithCompactor hook) [P2]
|
||||
compact/ context compactor (WithCompactor hook) [P2 ✓]
|
||||
tools/{web,net,store,compose,meta,comms} generic tools [P3]
|
||||
|
||||
BATTERIES (opt-in siblings, each nil-safe + a default):
|
||||
|
||||
@@ -31,15 +31,23 @@ bot) — mort and gadfly are the first two consumers (heavy and light). See
|
||||
|
||||
[mort]: https://gitea.stevedudenhoeffer.com/steve/mort
|
||||
|
||||
**Available today (P0):**
|
||||
**Available today:**
|
||||
|
||||
- `run/` — **executus is runnable.** `run.Executor` ties model resolution, the
|
||||
tool registry, majordomo's agent loop, context compaction, run-bounding, and
|
||||
step/audit instrumentation into one `Run(ctx, RunnableAgent, inv) Result`, with
|
||||
every host concern behind a nil-safe `run.Ports` (Audit/Budget/Critic/
|
||||
Checkpointer/PaletteSource/Delivery). See `examples/minimal`.
|
||||
- `model/` — config-driven tier resolution + failover over majordomo, with
|
||||
pluggable `UsageSink`/`TraceSink` and `GenerateWith[T]` structured output.
|
||||
- `tool/` — the tool registry + 3-stage permission model + SSRF guard.
|
||||
- `compact/` — the per-run context compactor.
|
||||
- `lane/` — bounded worker pool with fair-share queueing (run- and
|
||||
provider-concurrency).
|
||||
- `fanout/` — programmatic N×M swarm with bounded global + per-key concurrency.
|
||||
- `config/` — the host config seam (`Source`) with an env-var default.
|
||||
- `deliver/` — the output-egress seam with `Discard`/`Stdout` defaults.
|
||||
- `identity/` — caller-identity seams (`AdminPolicy`, `MemberResolver`).
|
||||
- `dispatchguard/`, `pendingattach/`, `run/progress.go` — run-safety primitives.
|
||||
- `config/`, `deliver/`, `identity/` — host seams (config / output / identity),
|
||||
each with a shipped default.
|
||||
- `dispatchguard/`, `pendingattach/` — run-safety primitives.
|
||||
|
||||
## Design
|
||||
|
||||
|
||||
@@ -0,0 +1,369 @@
|
||||
// V15.2 context compactor (re-based on majordomo).
|
||||
//
|
||||
// Why: the agent loop accumulates tool results indefinitely. A
|
||||
// research-heavy run with many web_search / read_page / http_get
|
||||
// results easily crosses 200K tokens and trips the model's HTTP-400
|
||||
// "prompt too long" rejection mid-run (observed at 410K tokens on
|
||||
// qwen3-coder:480b which has a 262K cap). majordomo's agent loop calls
|
||||
// the compactor with the full message slice before every model call;
|
||||
// the compactor returns a shorter slice that preserves the system
|
||||
// prompt + recent messages, with the middle range replaced by a
|
||||
// synthetic summary.
|
||||
//
|
||||
// Strategy (unchanged from the agentkit era):
|
||||
// - Keep any leading system message verbatim. (Under majordomo the
|
||||
// system prompt normally travels in Request.System, not in the
|
||||
// message slice, so this is defensive.)
|
||||
// - Keep the last KeepRecent messages verbatim. This ensures the
|
||||
// agent has fresh tool state to act on; compacting too aggressively
|
||||
// would strip the in-flight context it needs to make the next
|
||||
// decision.
|
||||
// - Compress the middle range via a single fast-tier LLM call that
|
||||
// receives the middle messages as raw text and produces a paragraph
|
||||
// summary (URLs visited, key findings, file_ids created, what the
|
||||
// agent is trying to accomplish).
|
||||
// - Replace the middle range with one synthetic user-role message
|
||||
// containing the summary. (user-role chosen because tool-result-role
|
||||
// would be ambiguous without a matching tool_call_id.)
|
||||
//
|
||||
// What moved in the majordomo conversion:
|
||||
// - The token-threshold gate lives HERE now. agentkit estimated
|
||||
// tokens and only invoked the compactor past a configured
|
||||
// threshold; majordomo's hook fires before every model call, so
|
||||
// the threshold check (estimateTokens vs the per-run threshold the
|
||||
// executor computes from the model's context limit) is the
|
||||
// compactor's first step.
|
||||
// - The compactor is per-run STATEFUL: majordomo does not replace
|
||||
// the loop's internal transcript with the compacted slice (the
|
||||
// hook shapes only what is SENT), so without memory the middle
|
||||
// would be re-summarised from scratch on every step past the
|
||||
// threshold. The state remembers how far the transcript has been
|
||||
// folded into the running summary and folds the previous summary
|
||||
// into the next one instead of re-paying for it.
|
||||
//
|
||||
// Failure path: any error (LLM unavailable, malformed response, etc.)
|
||||
// is returned to the agent loop, which treats compactor errors as
|
||||
// non-fatal and sends the original slice — if the next model call hits
|
||||
// the provider's limit, the existing HTTP-400 error path takes over.
|
||||
|
||||
package compact
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"strings"
|
||||
"sync"
|
||||
|
||||
"gitea.stevedudenhoeffer.com/steve/majordomo/llm"
|
||||
)
|
||||
|
||||
// ModelResolver resolves a tier/spec to a usable llm.Model (and an enriched
|
||||
// context for usage attribution). model.ParseModelForContext satisfies it.
|
||||
type ModelResolver func(ctx context.Context, tier string) (context.Context, llm.Model, error)
|
||||
|
||||
// Compactor is the per-run compaction hook handed to the agent loop
|
||||
// (matches the signature agent.WithCompactor expects).
|
||||
type Compactor = func(ctx context.Context, msgs []llm.Message) ([]llm.Message, error)
|
||||
|
||||
// CompactionEvent describes one fired compaction so the executor can
|
||||
// log it to skill_run_logs ("compaction_fired").
|
||||
type CompactionEvent struct {
|
||||
// MessagesBefore/After count the messages that would have been
|
||||
// sent without/with this compaction.
|
||||
MessagesBefore int
|
||||
MessagesAfter int
|
||||
// TokensBefore/After are the estimateTokens values for the same
|
||||
// two slices.
|
||||
TokensBefore int
|
||||
TokensAfter int
|
||||
}
|
||||
|
||||
// CompactorFactory mints a fresh per-run Compactor bound to a token
|
||||
// threshold. onFire (nil-safe) observes every compaction that actually
|
||||
// fires. A non-positive threshold yields a pass-through compactor.
|
||||
type CompactorFactory func(thresholdTokens int, onFire func(CompactionEvent)) Compactor
|
||||
|
||||
// CompactorConfig controls the v15.2 compactor's behaviour. Construct
|
||||
// in mort.go from convars + the executor's ModelResolver.
|
||||
type CompactorConfig struct {
|
||||
// Models resolves a model spec (typically "fast" or a specific tier)
|
||||
// to an llm.Model. Required; nil disables compaction.
|
||||
Models ModelResolver
|
||||
|
||||
// SummarizerTier is the model tier used for the compression LLM call.
|
||||
// Production default "fast"; an admin may set "haiku" or a specific
|
||||
// model spec. Empty falls back to "fast".
|
||||
SummarizerTier string
|
||||
|
||||
// KeepRecent is the number of trailing messages preserved verbatim.
|
||||
// Default 8. Lower values compact more aggressively (the next loop
|
||||
// iteration sees less recent context); higher values keep more.
|
||||
KeepRecent int
|
||||
|
||||
// SummaryWordCap bounds the LLM-generated summary length. Default
|
||||
// 200 words ≈ 800 chars ≈ 200 tokens — small enough that the
|
||||
// compaction always shrinks the slice meaningfully.
|
||||
SummaryWordCap int
|
||||
}
|
||||
|
||||
// NewCompactor returns a CompactorFactory implementing the middle-range
|
||||
// summarisation strategy. nil cfg.Models returns a factory of no-op
|
||||
// compactors that always return the input unchanged (degrades to
|
||||
// v15.1 behaviour).
|
||||
func NewCompactor(cfg CompactorConfig) CompactorFactory {
|
||||
if cfg.Models == nil {
|
||||
return func(int, func(CompactionEvent)) Compactor {
|
||||
return func(_ context.Context, msgs []llm.Message) ([]llm.Message, error) {
|
||||
return msgs, nil
|
||||
}
|
||||
}
|
||||
}
|
||||
if cfg.SummarizerTier == "" {
|
||||
cfg.SummarizerTier = "fast"
|
||||
}
|
||||
if cfg.KeepRecent <= 0 {
|
||||
cfg.KeepRecent = 8
|
||||
}
|
||||
if cfg.SummaryWordCap <= 0 {
|
||||
cfg.SummaryWordCap = 200
|
||||
}
|
||||
return func(threshold int, onFire func(CompactionEvent)) Compactor {
|
||||
st := &compactionState{}
|
||||
return func(ctx context.Context, msgs []llm.Message) ([]llm.Message, error) {
|
||||
return compactIfNeeded(ctx, cfg, st, threshold, onFire, msgs)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// compactionState is the per-run fold memory: msgs[:prefixEnd]
|
||||
// (excluding a leading system message) are represented by summaryText
|
||||
// in the rendered slice. Guarded by mu for safety although the agent
|
||||
// loop invokes the hook from a single goroutine.
|
||||
type compactionState struct {
|
||||
mu sync.Mutex
|
||||
prefixEnd int
|
||||
summaryText string
|
||||
}
|
||||
|
||||
// compactIfNeeded is the workhorse: render the transcript with any
|
||||
// existing fold applied, check the threshold, and fold more of the
|
||||
// middle into the summary when the rendered size still exceeds it.
|
||||
func compactIfNeeded(ctx context.Context, cfg CompactorConfig, st *compactionState,
|
||||
threshold int, onFire func(CompactionEvent), msgs []llm.Message) ([]llm.Message, error) {
|
||||
st.mu.Lock()
|
||||
defer st.mu.Unlock()
|
||||
|
||||
rendered := renderCompacted(st, msgs)
|
||||
if threshold <= 0 {
|
||||
return rendered, nil
|
||||
}
|
||||
tokensBefore := estimateTokens(rendered)
|
||||
if tokensBefore < threshold {
|
||||
return rendered, nil
|
||||
}
|
||||
|
||||
// Determine the new middle range to fold: everything between what
|
||||
// is already summarised (or the optional leading system message)
|
||||
// and the KeepRecent tail.
|
||||
startMiddle := st.prefixEnd
|
||||
if startMiddle == 0 && len(msgs) > 0 && msgs[0].Role == llm.RoleSystem {
|
||||
startMiddle = 1
|
||||
}
|
||||
endMiddle := len(msgs) - cfg.KeepRecent
|
||||
if endMiddle <= startMiddle {
|
||||
// Nothing new to fold (the tail alone exceeds the threshold).
|
||||
// Return the rendered slice; the model call may still succeed.
|
||||
return rendered, nil
|
||||
}
|
||||
middle := msgs[startMiddle:endMiddle]
|
||||
|
||||
summary, err := summariseMiddle(ctx, cfg, st.summaryText, middle)
|
||||
if err != nil {
|
||||
// Non-fatal upstream: the agent loop sends the ORIGINAL slice. Return
|
||||
// msgs, not `rendered` — on a second+ compaction `rendered` already
|
||||
// carries a prior synthetic summary, which is not the documented
|
||||
// "original slice" the loop expects on a compactor error.
|
||||
return msgs, fmt.Errorf("compactor: summarise middle: %w", err)
|
||||
}
|
||||
st.summaryText = summary
|
||||
st.prefixEnd = endMiddle
|
||||
|
||||
out := renderCompacted(st, msgs)
|
||||
if onFire != nil {
|
||||
onFire(CompactionEvent{
|
||||
MessagesBefore: len(rendered),
|
||||
MessagesAfter: len(out),
|
||||
TokensBefore: tokensBefore,
|
||||
TokensAfter: estimateTokens(out),
|
||||
})
|
||||
}
|
||||
return out, nil
|
||||
}
|
||||
|
||||
// renderCompacted applies the fold state to msgs: [optional system] +
|
||||
// [synthetic summary] + msgs[prefixEnd:]. With no fold yet, msgs is
|
||||
// returned unchanged.
|
||||
func renderCompacted(st *compactionState, msgs []llm.Message) []llm.Message {
|
||||
if st.prefixEnd <= 0 || st.prefixEnd > len(msgs) {
|
||||
return msgs
|
||||
}
|
||||
tail := msgs[st.prefixEnd:]
|
||||
out := make([]llm.Message, 0, len(tail)+2)
|
||||
if msgs[0].Role == llm.RoleSystem {
|
||||
out = append(out, msgs[0])
|
||||
}
|
||||
out = append(out, llm.UserText(
|
||||
"[CONTEXT COMPACTED] The earlier portion of this conversation was summarised "+
|
||||
"to fit the model's context window. Summary:\n\n"+st.summaryText+
|
||||
"\n\nResume from the recent messages below."))
|
||||
out = append(out, tail...)
|
||||
return out
|
||||
}
|
||||
|
||||
// summariseMiddle composes a "compress this transcript" prompt and
|
||||
// fires one fast-tier LLM call. prevSummary (may be empty) is the
|
||||
// running summary from earlier compactions; it is folded into the new
|
||||
// summary so prior context is not lost.
|
||||
func summariseMiddle(ctx context.Context, cfg CompactorConfig, prevSummary string, middle []llm.Message) (string, error) {
|
||||
if len(middle) == 0 {
|
||||
return "", errors.New("compactor: empty middle range")
|
||||
}
|
||||
modelCtx, model, err := cfg.Models(ctx, cfg.SummarizerTier)
|
||||
if err != nil {
|
||||
return "", fmt.Errorf("compactor: resolve summarizer model %q: %w", cfg.SummarizerTier, err)
|
||||
}
|
||||
if model == nil {
|
||||
return "", errors.New("compactor: summarizer model resolved to nil")
|
||||
}
|
||||
if modelCtx != nil {
|
||||
ctx = modelCtx
|
||||
}
|
||||
|
||||
var prior string
|
||||
if strings.TrimSpace(prevSummary) != "" {
|
||||
prior = "AN EARLIER PORTION WAS ALREADY SUMMARISED AS:\n" + prevSummary +
|
||||
"\n\nFold that summary into your new one — its facts must survive.\n\n"
|
||||
}
|
||||
transcript := renderTranscript(middle)
|
||||
prompt := fmt.Sprintf(
|
||||
"You are compressing an in-flight agent's conversation transcript so the agent "+
|
||||
"can continue working without blowing its model context. The transcript below is "+
|
||||
"a sequence of tool calls and their results. Produce a single paragraph (under %d words) "+
|
||||
"that captures:\n"+
|
||||
" - WHAT the agent has been trying to accomplish.\n"+
|
||||
" - WHICH URLs were visited / fetched (list inline, comma-separated).\n"+
|
||||
" - KEY findings or decisions (factual results the agent will need later).\n"+
|
||||
" - ANY file_ids or KV keys the agent created — these are persistent state references the agent must keep.\n"+
|
||||
" - ANY errors or dead-ends that the agent should not re-try.\n"+
|
||||
"DO NOT include verbose HTTP headers, tool-call metadata, error stack traces, or repetitive content. "+
|
||||
"DO NOT add commentary or markdown headers. Output prose only.\n\n"+
|
||||
"%sTRANSCRIPT TO COMPRESS:\n%s",
|
||||
cfg.SummaryWordCap,
|
||||
prior,
|
||||
transcript,
|
||||
)
|
||||
|
||||
resp, err := model.Generate(ctx, llm.Request{Messages: []llm.Message{llm.UserText(prompt)}})
|
||||
if err != nil {
|
||||
return "", fmt.Errorf("compactor: summarise LLM call: %w", err)
|
||||
}
|
||||
text := strings.TrimSpace(resp.Text())
|
||||
if text == "" {
|
||||
return "", errors.New("compactor: summarizer returned empty text")
|
||||
}
|
||||
return text, nil
|
||||
}
|
||||
|
||||
// estimateTokens is the chars/4 heuristic over a message slice's text,
|
||||
// tool calls, and tool results. Images count a flat ~1K tokens each.
|
||||
// It intentionally matches the coarse estimator the old agentkit loop
|
||||
// used — the 0.7 threshold ratio provides the safety margin.
|
||||
func estimateTokens(msgs []llm.Message) int {
|
||||
chars := 0
|
||||
for _, m := range msgs {
|
||||
for _, p := range m.Parts {
|
||||
switch v := p.(type) {
|
||||
case llm.TextPart:
|
||||
chars += len(v.Text)
|
||||
case llm.ImagePart:
|
||||
chars += 4096
|
||||
default:
|
||||
// llm.Part is a sealed-but-extensible interface (future media
|
||||
// kinds). Count an unknown part conservatively (like an image)
|
||||
// rather than 0, so a transcript of unrecognised content can't
|
||||
// silently slip under the compaction threshold and 400 the
|
||||
// model. Bump this if a large new part kind lands.
|
||||
_ = v
|
||||
chars += 4096
|
||||
}
|
||||
}
|
||||
for _, tc := range m.ToolCalls {
|
||||
chars += len(tc.Name) + len(tc.Arguments)
|
||||
}
|
||||
for _, tr := range m.ToolResults {
|
||||
chars += len(tr.Content)
|
||||
}
|
||||
}
|
||||
return chars / 4
|
||||
}
|
||||
|
||||
// transcriptMessageCap bounds individual message bodies at ~2KB so a
|
||||
// single ultra-long tool result can't dominate the prompt sent to the
|
||||
// summarizer.
|
||||
const transcriptMessageCap = 2048
|
||||
|
||||
// secretBearingTools name tools whose ARGUMENTS routinely carry credentials or
|
||||
// message bodies (bearer tokens, API keys, recipients, request bodies). Their
|
||||
// args are dropped before the transcript reaches the summarizer model — which
|
||||
// may be a different provider/tier than the run model — mirroring the redaction
|
||||
// run/steps.go applies to user-facing step summaries. http_* and webhook_* are
|
||||
// matched by prefix below.
|
||||
var secretBearingTools = map[string]bool{
|
||||
"mcp_call": true,
|
||||
"email_send": true,
|
||||
}
|
||||
|
||||
// redactToolArgs returns a summariser-safe rendering of a tool call's args:
|
||||
// "[redacted]" for known secret-bearing tools, the args verbatim otherwise.
|
||||
func redactToolArgs(name, args string) string {
|
||||
if secretBearingTools[name] ||
|
||||
strings.HasPrefix(name, "http_") ||
|
||||
strings.HasPrefix(name, "webhook_") {
|
||||
return "[redacted]"
|
||||
}
|
||||
return args
|
||||
}
|
||||
|
||||
// renderTranscript flattens a message slice to a plain-text transcript
|
||||
// suitable for the summarisation prompt. Tool calls show name + (redacted) args,
|
||||
// tool results show name + body. Empty fields are skipped.
|
||||
//
|
||||
// NOTE: tool-RESULT bodies are forwarded to the summarizer (the summary needs
|
||||
// the findings). A host whose tool results may contain secrets and whose
|
||||
// summarizer tier resolves to an untrusted provider should ensure that tier is
|
||||
// trusted, or pre-sanitise results before they reach the agent loop.
|
||||
func renderTranscript(msgs []llm.Message) string {
|
||||
var sb strings.Builder
|
||||
for i, m := range msgs {
|
||||
fmt.Fprintf(&sb, "---\n[%d] role=%s\n", i+1, m.Role)
|
||||
if text := m.Text(); text != "" {
|
||||
sb.WriteString(truncate(text, transcriptMessageCap))
|
||||
sb.WriteString("\n")
|
||||
}
|
||||
for _, tc := range m.ToolCalls {
|
||||
fmt.Fprintf(&sb, "tool_call name=%s args=%s\n", tc.Name, truncate(redactToolArgs(tc.Name, string(tc.Arguments)), transcriptMessageCap))
|
||||
}
|
||||
for _, tr := range m.ToolResults {
|
||||
fmt.Fprintf(&sb, "tool_result name=%s body=%s\n", tr.Name, truncate(tr.Content, transcriptMessageCap))
|
||||
}
|
||||
}
|
||||
return sb.String()
|
||||
}
|
||||
|
||||
func truncate(s string, n int) string {
|
||||
if len(s) <= n {
|
||||
return s
|
||||
}
|
||||
return s[:n] + "...(truncated)"
|
||||
}
|
||||
+35
-13
@@ -1,27 +1,49 @@
|
||||
// Command minimal demonstrates executus's standalone core primitives available
|
||||
// today (P0): the config seam + bounded fan-out. The full zero-config "agentic
|
||||
// in ~12 lines" example arrives once the model, tool, and run packages land
|
||||
// (P1–P3).
|
||||
// Command minimal is executus's "hello, agentic world": wire a model resolver,
|
||||
// a tool registry, and the run executor, then run an agent. With no batteries
|
||||
// (Audit/Budget/Critic/Checkpointer/Palette/Delivery all nil) this is a
|
||||
// bounded, in-memory run — the light-host shape (gadfly's case).
|
||||
//
|
||||
// Run it with a provider key for the configured tier, e.g.
|
||||
//
|
||||
// ANTHROPIC_API_KEY=sk-... go run ./examples/minimal
|
||||
//
|
||||
// Override a tier from the environment without touching code, e.g.
|
||||
//
|
||||
// EXECUTUS_MODEL_TIER_FAST=openai/gpt-4o-mini ANTHROPIC_API_KEY= OPENAI_KEY=sk-... go run ./examples/minimal
|
||||
package main
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"log"
|
||||
|
||||
"gitea.stevedudenhoeffer.com/steve/executus/config"
|
||||
"gitea.stevedudenhoeffer.com/steve/executus/fanout"
|
||||
"gitea.stevedudenhoeffer.com/steve/executus/model"
|
||||
"gitea.stevedudenhoeffer.com/steve/executus/run"
|
||||
"gitea.stevedudenhoeffer.com/steve/executus/tool"
|
||||
)
|
||||
|
||||
func main() {
|
||||
cfg := config.Env("EXECUTUS_") // e.g. EXECUTUS_FANOUT_MAX_CONCURRENT=8
|
||||
max := cfg.Int("fanout.max_concurrent", 4)
|
||||
// 1. Configure model tiers: live values come from the environment
|
||||
// (EXECUTUS_MODEL_TIER_<NAME>), falling back to these defaults.
|
||||
model.Configure(config.Env("EXECUTUS_"), map[string]string{
|
||||
"fast": "anthropic/claude-haiku-4-5",
|
||||
"thinking": "anthropic/claude-opus-4-8",
|
||||
}, 0)
|
||||
|
||||
items := []string{"alpha", "beta", "gamma", "delta"}
|
||||
results := fanout.Run(context.Background(), items,
|
||||
fanout.Options[string]{MaxConcurrent: max},
|
||||
func(_ context.Context, s string) (int, error) { return len(s), nil })
|
||||
// 2. Build the executor: a tool registry + the model resolver. No batteries.
|
||||
ex := run.New(run.Config{
|
||||
Registry: tool.NewRegistry(),
|
||||
Models: model.ParseModelForContext,
|
||||
})
|
||||
|
||||
for _, r := range results {
|
||||
fmt.Printf("%-6s -> %d (err=%v)\n", items[r.Index], r.Value, r.Err)
|
||||
// 3. Run an agent and print its answer.
|
||||
res := ex.Run(context.Background(),
|
||||
run.RunnableAgent{Name: "assistant", SystemPrompt: "You are concise.", ModelTier: "fast"},
|
||||
tool.Invocation{RunID: "demo-1", CallerID: "local"},
|
||||
"In one sentence, what is an agent harness?")
|
||||
if res.Err != nil {
|
||||
log.Fatalf("run failed: %v", res.Err)
|
||||
}
|
||||
fmt.Println(res.Output)
|
||||
}
|
||||
|
||||
+4
-1
@@ -34,6 +34,7 @@ import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"log/slog"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
@@ -574,7 +575,9 @@ func (h *Helper) recordLedger(ctx context.Context, call MetaCall) {
|
||||
if h.storage == nil {
|
||||
return
|
||||
}
|
||||
_ = h.storage.RecordMetaCall(ctx, call)
|
||||
if err := h.storage.RecordMetaCall(ctx, call); err != nil {
|
||||
slog.Warn("llmmeta: failed to record ledger row", "err", err)
|
||||
}
|
||||
}
|
||||
|
||||
// tryParseJSON attempts to decode text as JSON. Returns the parsed
|
||||
|
||||
+1
-1
@@ -237,7 +237,7 @@ func recordUsage(ctx context.Context, resp *llm.Response) {
|
||||
return
|
||||
}
|
||||
u := resp.Usage
|
||||
if u.InputTokens == 0 && u.OutputTokens == 0 {
|
||||
if u.InputTokens == 0 && u.OutputTokens == 0 && u.CacheReadTokens == 0 && u.CacheWriteTokens == 0 {
|
||||
return
|
||||
}
|
||||
model := resolvedModelName(ctx, resp)
|
||||
|
||||
+23
-2
@@ -314,7 +314,7 @@ func (c *CloudOllamaLimitCache) fetchTags(ctx context.Context) ([]string, error)
|
||||
return nil, err
|
||||
}
|
||||
defer resp.Body.Close()
|
||||
body, err := io.ReadAll(resp.Body)
|
||||
body, err := readCapped(resp.Body)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@@ -367,7 +367,7 @@ func (c *CloudOllamaLimitCache) fetchContextLength(ctx context.Context, modelNam
|
||||
return 0, err
|
||||
}
|
||||
defer resp.Body.Close()
|
||||
respBody, err := io.ReadAll(resp.Body)
|
||||
respBody, err := readCapped(resp.Body)
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
@@ -451,3 +451,24 @@ func truncate(b []byte, n int) string {
|
||||
}
|
||||
return string(b[:n]) + "...(truncated)"
|
||||
}
|
||||
|
||||
// maxLimitCacheResponseBytes bounds the ollama.com limit-cache HTTP responses
|
||||
// (/api/tags, /api/show) so a misbehaving endpoint can't stream an unbounded
|
||||
// body before the 15s timeout fires. 1 MiB is far above any real response.
|
||||
const maxLimitCacheResponseBytes = 1 << 20
|
||||
|
||||
// readCapped reads up to maxLimitCacheResponseBytes from r and returns a clear
|
||||
// error if the response EXCEEDS the cap — rather than silently truncating (as a
|
||||
// bare io.LimitReader does) and letting downstream json.Unmarshal fail with an
|
||||
// opaque "unexpected end of JSON input". It reads one extra byte to detect the
|
||||
// overflow.
|
||||
func readCapped(r io.Reader) ([]byte, error) {
|
||||
body, err := io.ReadAll(io.LimitReader(r, maxLimitCacheResponseBytes+1))
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if len(body) > maxLimitCacheResponseBytes {
|
||||
return nil, fmt.Errorf("cloud_sync: response exceeded %d bytes", maxLimitCacheResponseBytes)
|
||||
}
|
||||
return body, nil
|
||||
}
|
||||
|
||||
@@ -16,6 +16,11 @@ import (
|
||||
// UsageSink receives one record per successful Generate through a model parsed
|
||||
// by this package (ParseModelRequest / ParseModelForContext). Implement it to
|
||||
// meter or bill; the token detail mirrors majordomo's Response.Usage.
|
||||
//
|
||||
// IMPORTANT: cacheReadTokens and cacheWriteTokens are PORTIONS of inputTokens,
|
||||
// not independent additive values (they let a sink price cached vs fresh input
|
||||
// differently). A sink must NOT compute total = input+output+cacheRead+
|
||||
// cacheWrite — that double-counts the cached input.
|
||||
type UsageSink interface {
|
||||
Record(ctx context.Context, model string, inputTokens, outputTokens, cacheReadTokens, cacheWriteTokens int)
|
||||
}
|
||||
|
||||
@@ -0,0 +1,76 @@
|
||||
package run
|
||||
|
||||
import "time"
|
||||
|
||||
// RunnableAgent is the kernel's view of "a thing to run": an identity, a model
|
||||
// tier, a system prompt, execution caps, and a tool palette. It is a plain DTO
|
||||
// on purpose — the run kernel never imports a noun battery. The persona Agent
|
||||
// and the saved Skill each LOWER themselves into a RunnableAgent (a ToRunnable
|
||||
// method on the battery side), and the kernel runs the DTO. This is the
|
||||
// inversion of mort's agentexec.Executor.Run(*agents.Agent): the executor no
|
||||
// longer depends on the persona struct, only on this shape.
|
||||
//
|
||||
// A light host can build a RunnableAgent inline (model tier + prompt + a few
|
||||
// tool names) for a one-shot bounded run, with no persona or skill battery at
|
||||
// all — that is exactly gadfly's swarm task.
|
||||
type RunnableAgent struct {
|
||||
// ID is a stable identifier for the run subject (an agent/skill UUID, or
|
||||
// any host-chosen id). Used for audit attribution and dispatch-guard
|
||||
// genealogy. Empty is allowed for anonymous one-shot runs.
|
||||
ID string
|
||||
|
||||
// Name is a human label (audit/logs/delivery). Empty is allowed.
|
||||
Name string
|
||||
|
||||
// SystemPrompt is the agent's base system prompt (before per-run
|
||||
// personalization, which a host layers via Ports).
|
||||
SystemPrompt string
|
||||
|
||||
// ModelTier is a tier alias or concrete spec resolved through
|
||||
// model.ParseModelForContext. Empty resolves to the host's default tier.
|
||||
ModelTier string
|
||||
|
||||
// MaxIterations caps the agent loop's tool-dispatch steps. 0 = kernel
|
||||
// default. MaxRuntime caps wall-clock for the whole run (the kernel starts
|
||||
// this clock AFTER any lane dequeue, not at submission). 0 = kernel
|
||||
// default.
|
||||
MaxIterations int
|
||||
MaxRuntime time.Duration
|
||||
|
||||
// LowLevelTools are tool-registry names the run may call directly.
|
||||
// SkillPalette / SubAgentPalette name saved skills / sub-agents exposed as
|
||||
// skill__<name> / agent__<name> delegation tools, resolved through
|
||||
// Ports.Palette (nil Palette => those entries are inert).
|
||||
LowLevelTools []string
|
||||
SkillPalette []string
|
||||
SubAgentPalette []string
|
||||
|
||||
// Phases optionally model a multi-step pipeline (each phase its own prompt
|
||||
// + tier + tools). An empty slice is a single-phase run — the common case.
|
||||
Phases []Phase
|
||||
|
||||
// Critic configures the optional two-tier run-critic (Ports.Critic). The
|
||||
// zero value (disabled) is the light-host default.
|
||||
Critic CriticConfig
|
||||
}
|
||||
|
||||
// Phase is one step of a multi-step run: its own system prompt, model tier,
|
||||
// iteration cap, and tool subset. Optional phases may be skipped by the
|
||||
// pipeline when their precondition isn't met.
|
||||
type Phase struct {
|
||||
Name string
|
||||
SystemPrompt string
|
||||
ModelTier string
|
||||
MaxIterations int
|
||||
Tools []string
|
||||
Optional bool
|
||||
}
|
||||
|
||||
// CriticConfig configures the optional run-critic. Enabled gates whether a
|
||||
// critic monitor is started at all; BackstopMultiplier sets the hard-kill
|
||||
// deadline as a multiple of the soft trigger (MaxRuntime). A non-positive
|
||||
// multiplier uses the kernel default.
|
||||
type CriticConfig struct {
|
||||
Enabled bool
|
||||
BackstopMultiplier float64
|
||||
}
|
||||
+318
@@ -0,0 +1,318 @@
|
||||
package run
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"time"
|
||||
|
||||
"gitea.stevedudenhoeffer.com/steve/majordomo/agent"
|
||||
"gitea.stevedudenhoeffer.com/steve/majordomo/llm"
|
||||
|
||||
"gitea.stevedudenhoeffer.com/steve/executus/compact"
|
||||
"gitea.stevedudenhoeffer.com/steve/executus/tool"
|
||||
)
|
||||
|
||||
// ModelResolver resolves a tier alias or concrete spec to a usable llm.Model
|
||||
// and an enriched context (for usage attribution). model.ParseModelForContext
|
||||
// satisfies it.
|
||||
type ModelResolver func(ctx context.Context, tier string) (context.Context, llm.Model, error)
|
||||
|
||||
// Defaults are the executor's fallback caps and loop guards, applied per run
|
||||
// when the RunnableAgent leaves a field zero.
|
||||
type Defaults struct {
|
||||
MaxIterations int // tool-dispatch steps; default 12
|
||||
MaxRuntime time.Duration // wall-clock per run; default 60s
|
||||
FallbackTier string // tier when the agent's is empty; default "fast"
|
||||
MaxConsecutiveToolErrors int // loop guard; default 3
|
||||
MaxSameToolCallRepeats int // retry-storm guard; default 3
|
||||
CompactionThresholdRatio float64 // fraction of model context to compact at; default 0.7
|
||||
}
|
||||
|
||||
func (d Defaults) withFallbacks() Defaults {
|
||||
if d.MaxIterations <= 0 {
|
||||
d.MaxIterations = 12
|
||||
}
|
||||
if d.MaxRuntime <= 0 {
|
||||
d.MaxRuntime = 60 * time.Second
|
||||
}
|
||||
if d.FallbackTier == "" {
|
||||
d.FallbackTier = "fast"
|
||||
}
|
||||
if d.MaxConsecutiveToolErrors <= 0 {
|
||||
d.MaxConsecutiveToolErrors = 3
|
||||
}
|
||||
if d.MaxSameToolCallRepeats <= 0 {
|
||||
d.MaxSameToolCallRepeats = 3
|
||||
}
|
||||
if d.CompactionThresholdRatio <= 0 {
|
||||
d.CompactionThresholdRatio = 0.7
|
||||
}
|
||||
return d
|
||||
}
|
||||
|
||||
// Config wires an Executor. Registry + Models are required; everything else is
|
||||
// optional and nil-safe — the zero Config beyond those yields a bounded,
|
||||
// in-memory run with no persistence/audit/budget/critic/delegation/compaction
|
||||
// (gadfly's case).
|
||||
type Config struct {
|
||||
Registry tool.Registry
|
||||
Models ModelResolver
|
||||
Defaults Defaults
|
||||
Ports Ports
|
||||
|
||||
// Compactor mints the per-run context-compaction hook. nil disables
|
||||
// compaction. ContextTokens resolves a tier's model context-window (for
|
||||
// the compaction threshold); nil — or a zero return — also disables it.
|
||||
Compactor compact.CompactorFactory
|
||||
ContextTokens func(tier string) int
|
||||
|
||||
// SystemHeader is an optional platform header prepended to every agent's
|
||||
// system prompt.
|
||||
SystemHeader string
|
||||
}
|
||||
|
||||
// Executor runs a RunnableAgent through majordomo's agent loop with the wired
|
||||
// Ports. Construct with New; safe for concurrent use across runs.
|
||||
type Executor struct {
|
||||
cfg Config
|
||||
}
|
||||
|
||||
// New builds an Executor. It panics if Registry or Models is nil — those are
|
||||
// structural, not runtime, errors.
|
||||
func New(cfg Config) *Executor {
|
||||
if cfg.Registry == nil || cfg.Models == nil {
|
||||
panic("run.New: Registry and Models are required")
|
||||
}
|
||||
cfg.Defaults = cfg.Defaults.withFallbacks()
|
||||
return &Executor{cfg: cfg}
|
||||
}
|
||||
|
||||
// Result is one run's outcome. Err carries the run failure (if any); the other
|
||||
// fields are populated best-effort even on error (partial output/steps/usage).
|
||||
type Result struct {
|
||||
RunID string
|
||||
Output string
|
||||
Steps []tool.Step
|
||||
Usage llm.Usage
|
||||
Err error
|
||||
}
|
||||
|
||||
// Run executes ra with the given invocation + input and returns the Result. It
|
||||
// never propagates a panic; failures surface in Result.Err.
|
||||
func (e *Executor) Run(ctx context.Context, ra RunnableAgent, inv tool.Invocation, input string) Result {
|
||||
started := time.Now()
|
||||
res := Result{RunID: inv.RunID}
|
||||
|
||||
tier := ra.ModelTier
|
||||
if tier == "" {
|
||||
tier = e.cfg.Defaults.FallbackTier
|
||||
}
|
||||
maxIter := ra.MaxIterations
|
||||
if maxIter <= 0 {
|
||||
maxIter = e.cfg.Defaults.MaxIterations
|
||||
}
|
||||
maxRuntime := ra.MaxRuntime
|
||||
if maxRuntime <= 0 {
|
||||
maxRuntime = e.cfg.Defaults.MaxRuntime
|
||||
}
|
||||
|
||||
// Budget gate (pre-run): a rejected run makes no model call.
|
||||
if e.cfg.Ports.Budget != nil {
|
||||
if err := e.cfg.Ports.Budget.Check(ctx, inv.CallerID); err != nil {
|
||||
res.Err = err
|
||||
return res
|
||||
}
|
||||
}
|
||||
|
||||
// Resolve the model (enriches ctx for usage attribution).
|
||||
modelCtx, model, err := e.cfg.Models(ctx, tier)
|
||||
if err != nil {
|
||||
res.Err = fmt.Errorf("resolve model %q: %w", tier, err)
|
||||
return res
|
||||
}
|
||||
if model == nil {
|
||||
// A resolver returning (ctx, nil, nil) would otherwise nil-panic inside
|
||||
// the agent loop; surface it as a clean error (Run never panics out).
|
||||
res.Err = fmt.Errorf("resolve model %q: resolver returned a nil model", tier)
|
||||
return res
|
||||
}
|
||||
ctx = modelCtx
|
||||
|
||||
// Audit start (optional). The recorder satisfies RunTally; stamp it on the
|
||||
// invocation so a self-status tool can read live progress.
|
||||
var rec RunRecorder
|
||||
var stateAcc *RunStateAccessor
|
||||
if e.cfg.Ports.Audit != nil {
|
||||
rec = e.cfg.Ports.Audit.StartRun(ctx, RunInfo{
|
||||
RunID: inv.RunID,
|
||||
SubjectID: ra.ID,
|
||||
Name: ra.Name,
|
||||
CallerID: inv.CallerID,
|
||||
ChannelID: inv.ChannelID,
|
||||
ParentRunID: inv.ParentRunID,
|
||||
Inputs: inv.SkillInputs,
|
||||
StartedAt: started,
|
||||
})
|
||||
}
|
||||
if rec != nil {
|
||||
stateAcc = NewRunStateAccessor(rec, maxIter, 0, started)
|
||||
inv.RunState = stateAcc
|
||||
}
|
||||
|
||||
// Build the toolbox from the agent's low-level tools.
|
||||
toolbox, err := e.cfg.Registry.Build(ra.LowLevelTools, inv, tool.Visibility("private"), nil)
|
||||
if err != nil {
|
||||
res.Err = fmt.Errorf("build toolbox: %w", err)
|
||||
e.finishAudit(ctx, rec, "error", res, started, res.Err)
|
||||
return res
|
||||
}
|
||||
|
||||
// Run context: bound by MaxRuntime, detached from the caller's deadline so a
|
||||
// lane/queue wait doesn't eat the run budget (mort's V10 lesson). Caller
|
||||
// cancellation still propagates via MergeCancellation. Created BEFORE the
|
||||
// step observer so the observer forwards the merged run context (not a
|
||||
// possibly-cancelled caller ctx) to OnStep consumers.
|
||||
runCtx, cancel := context.WithTimeout(context.WithoutCancel(ctx), maxRuntime)
|
||||
defer cancel()
|
||||
runCtx, mergeCancel := MergeCancellation(runCtx, ctx)
|
||||
defer mergeCancel()
|
||||
|
||||
// Step instrumentation: accumulate Result.Steps + fire inv.OnStep, feed the
|
||||
// audit recorder, and keep the live iteration counter fresh. majordomo's
|
||||
// step observer hands us each completed iteration; we zip the model's tool
|
||||
// calls with their executed results PAIRWISE — a result without a matching
|
||||
// call (or a call without a result) is skipped rather than recorded as an
|
||||
// empty-name "ghost" step.
|
||||
emitter := newStepEmitter(inv.OnStep)
|
||||
stepObserver := func(s agent.Step) {
|
||||
if stateAcc != nil {
|
||||
stateAcc.SetIteration(s.Index)
|
||||
}
|
||||
if rec != nil {
|
||||
rec.OnStep(s.Index, s.Response)
|
||||
}
|
||||
var calls []llm.ToolCall
|
||||
if s.Response != nil {
|
||||
calls = s.Response.ToolCalls
|
||||
}
|
||||
n := len(s.Results)
|
||||
if len(calls) < n {
|
||||
n = len(calls)
|
||||
}
|
||||
for i := 0; i < n; i++ {
|
||||
call, r := calls[i], s.Results[i]
|
||||
emitter.toolStart(runCtx, call.Name, call.Arguments)
|
||||
emitter.toolEnd(runCtx, call, r.Content, r.IsError)
|
||||
if rec != nil {
|
||||
rec.OnTool(call, r.Content)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
opts := []agent.Option{
|
||||
agent.WithToolbox(toolbox),
|
||||
agent.WithMaxSteps(maxIter),
|
||||
agent.WithToolErrorLimits(e.cfg.Defaults.MaxConsecutiveToolErrors, e.cfg.Defaults.MaxSameToolCallRepeats),
|
||||
agent.WithStepObserver(stepObserver),
|
||||
}
|
||||
if e.cfg.Compactor != nil && e.cfg.ContextTokens != nil {
|
||||
if threshold := e.compactionThreshold(tier); threshold > 0 {
|
||||
// Forward compaction events to the audit log (makes the
|
||||
// CompactionEvent doc's "logged to the run trace" promise true).
|
||||
var onFire func(compact.CompactionEvent)
|
||||
if rec != nil {
|
||||
onFire = func(ev compact.CompactionEvent) {
|
||||
rec.LogEvent("compaction_fired", map[string]any{
|
||||
"messages_before": ev.MessagesBefore,
|
||||
"messages_after": ev.MessagesAfter,
|
||||
"tokens_before": ev.TokensBefore,
|
||||
"tokens_after": ev.TokensAfter,
|
||||
})
|
||||
}
|
||||
}
|
||||
opts = append(opts, agent.WithCompactor(e.cfg.Compactor(threshold, onFire)))
|
||||
}
|
||||
}
|
||||
|
||||
ag := agent.New(model, e.systemPrompt(ra), opts...)
|
||||
runRes, runErr := ag.Run(runCtx, input)
|
||||
|
||||
status := statusFor(runErr)
|
||||
if runRes != nil {
|
||||
res.Output = runRes.Output
|
||||
res.Usage = runRes.Usage
|
||||
}
|
||||
res.Steps = emitter.snapshot()
|
||||
res.Err = runErr
|
||||
|
||||
e.finishAudit(ctx, rec, status, res, started, runErr)
|
||||
if e.cfg.Ports.Budget != nil {
|
||||
e.cfg.Ports.Budget.Commit(detach(ctx), inv.CallerID, time.Since(started).Seconds())
|
||||
}
|
||||
return res
|
||||
}
|
||||
|
||||
// statusFor maps a run error to a RunStats.Status, distinguishing a deadline
|
||||
// (timeout) and a cancellation (cancelled — caller cancel or shutdown) from a
|
||||
// generic error so audit consumers can tell them apart.
|
||||
func statusFor(runErr error) string {
|
||||
switch {
|
||||
case runErr == nil:
|
||||
return "ok"
|
||||
case errors.Is(runErr, context.DeadlineExceeded):
|
||||
return "timeout"
|
||||
case errors.Is(runErr, context.Canceled):
|
||||
return "cancelled"
|
||||
default:
|
||||
return "error"
|
||||
}
|
||||
}
|
||||
|
||||
// finishAudit writes the terminal roll-up on a detached context so a cancelled
|
||||
// run still records (mort's CleanupContextTimeout lesson).
|
||||
func (e *Executor) finishAudit(ctx context.Context, rec RunRecorder, status string, res Result, started time.Time, runErr error) {
|
||||
if rec == nil {
|
||||
return
|
||||
}
|
||||
stats := RunStats{
|
||||
Status: status,
|
||||
Output: res.Output,
|
||||
ToolCalls: rec.ToolCallsCount(),
|
||||
RuntimeSeconds: time.Since(started).Seconds(),
|
||||
}
|
||||
if runErr != nil {
|
||||
stats.Error = runErr.Error()
|
||||
}
|
||||
stats.InputTokens, stats.OutputTokens, stats.ThinkingTokens = rec.TokenStats()
|
||||
rec.Close(detach(ctx), stats)
|
||||
}
|
||||
|
||||
func (e *Executor) systemPrompt(ra RunnableAgent) string {
|
||||
if e.cfg.SystemHeader == "" {
|
||||
return ra.SystemPrompt
|
||||
}
|
||||
if ra.SystemPrompt == "" {
|
||||
return e.cfg.SystemHeader
|
||||
}
|
||||
return e.cfg.SystemHeader + "\n\n" + ra.SystemPrompt
|
||||
}
|
||||
|
||||
// compactionThreshold returns the token threshold for the tier's model context
|
||||
// window (ratio × limit), or 0 when the limit is unknown.
|
||||
func (e *Executor) compactionThreshold(tier string) int {
|
||||
max := e.cfg.ContextTokens(tier)
|
||||
if max <= 0 {
|
||||
return 0
|
||||
}
|
||||
return int(float64(max) * e.cfg.Defaults.CompactionThresholdRatio)
|
||||
}
|
||||
|
||||
// detach derives a bounded cleanup context off ctx, detached from its
|
||||
// cancellation, for post-run writes. The cancel is intentionally not returned;
|
||||
// CleanupContextTimeout bounds the lifetime.
|
||||
func detach(ctx context.Context) context.Context {
|
||||
c, cancel := context.WithTimeout(context.WithoutCancel(ctx), CleanupContextTimeout)
|
||||
_ = cancel // bounded by the timeout; nothing to cancel early
|
||||
return c
|
||||
}
|
||||
@@ -0,0 +1,168 @@
|
||||
package run
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"testing"
|
||||
|
||||
"gitea.stevedudenhoeffer.com/steve/majordomo/llm"
|
||||
"gitea.stevedudenhoeffer.com/steve/majordomo/provider/fake"
|
||||
|
||||
"gitea.stevedudenhoeffer.com/steve/executus/tool"
|
||||
)
|
||||
|
||||
// fakeModels returns a ModelResolver backed by a fake provider scripted to
|
||||
// reply with the given text (no tool calls — the loop terminates immediately).
|
||||
func fakeModels(t *testing.T, reply string) ModelResolver {
|
||||
t.Helper()
|
||||
fp := fake.New("fake")
|
||||
fp.Enqueue("test-model", fake.Reply(reply))
|
||||
m, err := fp.Model("test-model")
|
||||
if err != nil {
|
||||
t.Fatalf("fake model: %v", err)
|
||||
}
|
||||
return func(ctx context.Context, _ string) (context.Context, llm.Model, error) {
|
||||
return ctx, m, nil
|
||||
}
|
||||
}
|
||||
|
||||
// TestExecutorRunHelloWorld is the milestone: executus runs an agent end-to-end
|
||||
// against the fake provider and returns its output. Proves the kernel is
|
||||
// runnable with the zero Ports (no persistence/audit/budget/critic).
|
||||
func TestExecutorRunHelloWorld(t *testing.T) {
|
||||
ex := New(Config{
|
||||
Registry: tool.NewRegistry(),
|
||||
Models: fakeModels(t, "hello from executus"),
|
||||
})
|
||||
|
||||
res := ex.Run(context.Background(),
|
||||
RunnableAgent{Name: "greeter", SystemPrompt: "be brief", ModelTier: "test-model"},
|
||||
tool.Invocation{RunID: "run-1", CallerID: "caller-1"},
|
||||
"say hi")
|
||||
|
||||
if res.Err != nil {
|
||||
t.Fatalf("run error: %v", res.Err)
|
||||
}
|
||||
if res.Output != "hello from executus" {
|
||||
t.Fatalf("output = %q, want %q", res.Output, "hello from executus")
|
||||
}
|
||||
if res.RunID != "run-1" {
|
||||
t.Errorf("RunID = %q, want run-1", res.RunID)
|
||||
}
|
||||
}
|
||||
|
||||
// TestExecutorBudgetRejection: a Budget that denies makes no model call.
|
||||
func TestExecutorBudgetRejection(t *testing.T) {
|
||||
denied := errors.New("over budget")
|
||||
var modelCalled bool
|
||||
models := func(ctx context.Context, _ string) (context.Context, llm.Model, error) {
|
||||
modelCalled = true
|
||||
return ctx, nil, nil
|
||||
}
|
||||
ex := New(Config{
|
||||
Registry: tool.NewRegistry(),
|
||||
Models: models,
|
||||
Ports: Ports{Budget: budgetFunc{check: func(string) error { return denied }}},
|
||||
})
|
||||
|
||||
res := ex.Run(context.Background(),
|
||||
RunnableAgent{ModelTier: "test-model"},
|
||||
tool.Invocation{RunID: "r", CallerID: "broke"}, "hi")
|
||||
|
||||
if !errors.Is(res.Err, denied) {
|
||||
t.Fatalf("err = %v, want budget denial", res.Err)
|
||||
}
|
||||
if modelCalled {
|
||||
t.Error("model must not be resolved/called when budget denies")
|
||||
}
|
||||
}
|
||||
|
||||
// TestExecutorAuditWiring: the Audit port receives StartRun + Close with the
|
||||
// terminal status/output.
|
||||
func TestExecutorAuditWiring(t *testing.T) {
|
||||
rec := &captureRecorder{}
|
||||
ex := New(Config{
|
||||
Registry: tool.NewRegistry(),
|
||||
Models: fakeModels(t, "done"),
|
||||
Ports: Ports{Audit: auditFunc{start: func(RunInfo) RunRecorder { return rec }}},
|
||||
})
|
||||
|
||||
res := ex.Run(context.Background(),
|
||||
RunnableAgent{ModelTier: "test-model"},
|
||||
tool.Invocation{RunID: "r2", CallerID: "c"}, "go")
|
||||
|
||||
if res.Err != nil {
|
||||
t.Fatalf("run error: %v", res.Err)
|
||||
}
|
||||
if !rec.closed {
|
||||
t.Fatal("recorder.Close was not called")
|
||||
}
|
||||
if rec.stats.Status != "ok" {
|
||||
t.Errorf("close status = %q, want ok", rec.stats.Status)
|
||||
}
|
||||
if rec.stats.Output != "done" {
|
||||
t.Errorf("close output = %q, want done", rec.stats.Output)
|
||||
}
|
||||
}
|
||||
|
||||
// --- test doubles ---
|
||||
|
||||
type budgetFunc struct{ check func(callerID string) error }
|
||||
|
||||
func (b budgetFunc) Check(_ context.Context, callerID string) error { return b.check(callerID) }
|
||||
func (b budgetFunc) Commit(context.Context, string, float64) {}
|
||||
|
||||
type auditFunc struct{ start func(RunInfo) RunRecorder }
|
||||
|
||||
func (a auditFunc) StartRun(_ context.Context, info RunInfo) RunRecorder { return a.start(info) }
|
||||
|
||||
type captureRecorder struct {
|
||||
closed bool
|
||||
stats RunStats
|
||||
steps int
|
||||
tools int
|
||||
}
|
||||
|
||||
func (r *captureRecorder) TokenStats() (in, out, thinking int64) { return 0, 0, 0 }
|
||||
func (r *captureRecorder) ToolCallsCount() int { return r.tools }
|
||||
func (r *captureRecorder) OnStep(int, *llm.Response) { r.steps++ }
|
||||
func (r *captureRecorder) OnTool(llm.ToolCall, string) { r.tools++ }
|
||||
func (r *captureRecorder) LogEvent(string, map[string]any) {}
|
||||
func (r *captureRecorder) LogError(string) {}
|
||||
func (r *captureRecorder) Close(_ context.Context, s RunStats) { r.closed = true; r.stats = s }
|
||||
|
||||
// TestExecutorNilModelNoPanic: a resolver returning (ctx, nil, nil) yields a
|
||||
// clean error, not a nil-pointer panic (gadfly F1, high severity).
|
||||
func TestExecutorNilModelNoPanic(t *testing.T) {
|
||||
ex := New(Config{
|
||||
Registry: tool.NewRegistry(),
|
||||
Models: func(ctx context.Context, _ string) (context.Context, llm.Model, error) {
|
||||
return ctx, nil, nil // nil model, nil error
|
||||
},
|
||||
})
|
||||
res := ex.Run(context.Background(),
|
||||
RunnableAgent{ModelTier: "x"}, tool.Invocation{RunID: "r"}, "hi")
|
||||
if res.Err == nil {
|
||||
t.Fatal("expected an error for a nil model, got nil (would have panicked in the loop)")
|
||||
}
|
||||
}
|
||||
|
||||
// TestStatusFor maps run errors to RunStats.Status (gadfly F3).
|
||||
func TestStatusFor(t *testing.T) {
|
||||
cases := []struct {
|
||||
err error
|
||||
want string
|
||||
}{
|
||||
{nil, "ok"},
|
||||
{context.DeadlineExceeded, "timeout"},
|
||||
{context.Canceled, "cancelled"},
|
||||
{fmt.Errorf("wrapped: %w", context.DeadlineExceeded), "timeout"},
|
||||
{errors.New("boom"), "error"},
|
||||
}
|
||||
for _, c := range cases {
|
||||
if got := statusFor(c.err); got != c.want {
|
||||
t.Errorf("statusFor(%v) = %q, want %q", c.err, got, c.want)
|
||||
}
|
||||
}
|
||||
}
|
||||
+168
@@ -0,0 +1,168 @@
|
||||
package run
|
||||
|
||||
import (
|
||||
"context"
|
||||
"time"
|
||||
|
||||
"gitea.stevedudenhoeffer.com/steve/majordomo/llm"
|
||||
|
||||
"gitea.stevedudenhoeffer.com/steve/executus/deliver"
|
||||
)
|
||||
|
||||
// Ports are the host seams the run executor consumes. Every field is nil-safe:
|
||||
// a light host passes the zero Ports and gets a bounded, in-memory run with no
|
||||
// persistence, audit, budget, critic, delegation, or delivery — which is
|
||||
// exactly a gadfly swarm task. A heavy host (mort) wires each one to a battery.
|
||||
//
|
||||
// This struct IS the inversion: in mort, agentexec imports agents /
|
||||
// agentcritic / skillaudit and skillexec imports skills / paste directly; here
|
||||
// the kernel depends only on these interfaces, and the batteries implement
|
||||
// them. The mort_*_adapters.go wall becomes the set of impls.
|
||||
type Ports struct {
|
||||
// Audit records the run trace (start, per-step/per-tool events, final
|
||||
// stats). nil = no audit.
|
||||
Audit Audit
|
||||
// Budget gates and meters per-caller resource use. nil = unbounded.
|
||||
Budget Budget
|
||||
// Critic optionally monitors a long run for hangs/runaways. nil = none.
|
||||
Critic Critic
|
||||
// Checkpointer persists resumable progress for durable recovery. nil = no
|
||||
// checkpointing (a run interrupted by shutdown is simply lost).
|
||||
Checkpointer Checkpointer
|
||||
// Palette resolves SkillPalette / SubAgentPalette entries into delegation
|
||||
// tools (skill__<name> / agent__<name>). nil = those entries are inert.
|
||||
Palette PaletteSource
|
||||
// Delivery is where the run's output + artifacts go. nil = the caller
|
||||
// reads the Result in-process (the light-host default).
|
||||
Delivery deliver.Delivery
|
||||
}
|
||||
|
||||
// RunInfo describes a run at start time — the attribution a recorder/critic
|
||||
// needs. Host-neutral rename of mort's SkillRun start fields.
|
||||
type RunInfo struct {
|
||||
RunID string
|
||||
SubjectID string // the agent/skill id being run (audit "skill_id")
|
||||
Name string
|
||||
CallerID string
|
||||
ChannelID string
|
||||
ParentRunID string
|
||||
Inputs map[string]any
|
||||
StartedAt time.Time
|
||||
}
|
||||
|
||||
// RunStats is the terminal roll-up a recorder's Close writes. Mirrors mort's
|
||||
// skillaudit/skillexec RunStats.
|
||||
type RunStats struct {
|
||||
Status string // ok | error | timeout | budget_exceeded | cancelled | dry_run
|
||||
Output string
|
||||
Error string
|
||||
ToolCalls int
|
||||
RuntimeSeconds float64
|
||||
InputTokens int64
|
||||
OutputTokens int64
|
||||
ThinkingTokens int64
|
||||
}
|
||||
|
||||
// --- Audit ---
|
||||
|
||||
// Audit begins recording a run. StartRun returns a per-run RunRecorder (or nil
|
||||
// to skip recording this run). The audit battery wires its Storage behind this.
|
||||
type Audit interface {
|
||||
StartRun(ctx context.Context, info RunInfo) RunRecorder
|
||||
}
|
||||
|
||||
// RunRecorder records the events of one in-flight run and its final stats. It
|
||||
// satisfies RunTally so the kernel can surface live token/tool counts to the
|
||||
// self-status tool. Mirrors mort's skillaudit.Writer.
|
||||
type RunRecorder interface {
|
||||
RunTally
|
||||
// OnStep records one completed agent-loop iteration's model response.
|
||||
OnStep(iter int, resp *llm.Response)
|
||||
// OnTool records one executed tool call + its result.
|
||||
OnTool(call llm.ToolCall, result string)
|
||||
// LogEvent / LogError append structured events to the run log.
|
||||
LogEvent(eventType string, payload map[string]any)
|
||||
LogError(msg string)
|
||||
// Close writes the terminal roll-up. Detaches from the caller's context
|
||||
// internally so a cancelled run still records.
|
||||
Close(ctx context.Context, stats RunStats)
|
||||
}
|
||||
|
||||
// --- Budget ---
|
||||
|
||||
// Budget gates and meters per-caller resource use. Mirrors mort's
|
||||
// skillexec.BudgetTracker.
|
||||
type Budget interface {
|
||||
// Check reports whether the caller has remaining budget (nil = allowed).
|
||||
Check(ctx context.Context, callerID string) error
|
||||
// Commit records that the caller spent runtimeSeconds on this run.
|
||||
Commit(ctx context.Context, callerID string, runtimeSeconds float64)
|
||||
}
|
||||
|
||||
// --- Critic ---
|
||||
|
||||
// Critic optionally monitors a long-running run (the two-tier soft/hard
|
||||
// timeout). Monitor returns a handle the executor feeds progress into and
|
||||
// queries for steer/deadline decisions; a nil handle means "not monitored".
|
||||
//
|
||||
// The exact wiring (how the handle's Steer/Deadline bind into majordomo's
|
||||
// agent.WithSteer / agent.WithMaxStepsFunc / run-context cancellation) is
|
||||
// finalized in the executor; this is the seam the agentcritic battery adapts.
|
||||
type Critic interface {
|
||||
Monitor(ctx context.Context, info RunInfo, softTimeout time.Duration) CriticHandle
|
||||
}
|
||||
|
||||
// CriticHandle is the executor's live link to a run's critic.
|
||||
type CriticHandle interface {
|
||||
// RecordStep / RecordToolStart keep the critic's activity clock fresh so a
|
||||
// healthy-but-slow run is not mistaken for a hang.
|
||||
RecordStep(iter int)
|
||||
RecordToolStart(name, args string)
|
||||
// Steer returns any messages the critic wants injected into the loop (a
|
||||
// nudge), drained before each step — matches majordomo agent.WithSteer.
|
||||
Steer() []llm.Message
|
||||
// Deadline returns the current hard-kill deadline (the critic may extend
|
||||
// it); the executor binds the run context to it. Zero = no hard deadline.
|
||||
Deadline() time.Time
|
||||
// Stop ends monitoring when the run finishes.
|
||||
Stop()
|
||||
}
|
||||
|
||||
// --- Checkpointer ---
|
||||
|
||||
// Checkpointer persists a run's resumable progress for durable recovery.
|
||||
// Mirrors mort's agentexec.RunCheckpointer.
|
||||
type Checkpointer interface {
|
||||
// Save persists the run's current resumable progress (throttled).
|
||||
Save(ctx context.Context, st RunCheckpointState) error
|
||||
// Complete clears the checkpoint on success.
|
||||
Complete(ctx context.Context) error
|
||||
// Fail clears the checkpoint on terminal failure. A run interrupted by
|
||||
// shutdown is left untouched so boot recovery picks it up.
|
||||
Fail(ctx context.Context, err error) error
|
||||
}
|
||||
|
||||
// RunCheckpointState is the resumable snapshot a Checkpointer persists. Kept
|
||||
// minimal here; the executor extends what it records during the merge.
|
||||
type RunCheckpointState struct {
|
||||
Messages []llm.Message
|
||||
Iteration int
|
||||
}
|
||||
|
||||
// --- PaletteSource ---
|
||||
|
||||
// PaletteSource resolves a RunnableAgent's SkillPalette / SubAgentPalette names
|
||||
// into delegation tools and invokes them. Mirrors mort's
|
||||
// SkillInvokerForPalette + AgentInvokerForPalette. nil Palette => palette
|
||||
// entries are inert ("not configured" at first call).
|
||||
type PaletteSource interface {
|
||||
ResolveSkill(ctx context.Context, callerID, name string) (skillID string, err error)
|
||||
InvokeSkill(ctx context.Context, callerID, channelID, name string,
|
||||
inputs map[string]any, parentRunID string) (output, runID, status string, err error)
|
||||
|
||||
ResolveAgent(ctx context.Context, callerID, name string) (agentID string, err error)
|
||||
InvokeAgent(ctx context.Context, callerID, channelID, name string,
|
||||
prompt, parentRunID, modelTierOverride, promptPrepend string,
|
||||
toolsSubset []string,
|
||||
onEvent func(ctx context.Context, event, emoji string)) (output, runID, status string, err error)
|
||||
}
|
||||
@@ -0,0 +1,157 @@
|
||||
// Package run is executus's run kernel: the shared run-loop mechanics around
|
||||
// majordomo's agent loop, plus the host seams (run.Ports / RunnableAgent) that
|
||||
// let one executor serve every surface — a light host's bounded one-shot run,
|
||||
// a heavy host's persona agent or saved skill — without the kernel importing a
|
||||
// battery.
|
||||
//
|
||||
// This file holds the genuinely-identical scaffolding both run shapes need:
|
||||
// context cancellation merging, the detached-cleanup timeout, the per-run
|
||||
// progress accessor the self-status tool reads, the legacy `submit`
|
||||
// compatibility tool (submit.go), the ancestor progress bridge (progress.go),
|
||||
// and the run-finalizer machinery — one source of truth.
|
||||
//
|
||||
// The kernel depends only on majordomo + executus/tool + the run.Ports
|
||||
// interfaces; persistence, audit, the persona/skill nouns, and the critic are
|
||||
// host-supplied via Ports (see ports.go) so importing the kernel never drags in
|
||||
// a store or a battery.
|
||||
package run
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"log/slog"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
|
||||
"gitea.stevedudenhoeffer.com/steve/executus/tool"
|
||||
)
|
||||
|
||||
// ErrShutdown is the cancellation cause set on mort's base lifecycle context
|
||||
// when the process is shutting down (SIGTERM after the drain window). The
|
||||
// agent executor uses it to distinguish a run interrupted by shutdown (which
|
||||
// should be left durable-recoverable) from a run that errored or hit its own
|
||||
// deadline (terminal).
|
||||
var ErrShutdown = errors.New("mort: shutting down")
|
||||
|
||||
// CleanupContextTimeout caps how long a run's post-completion cleanup ops
|
||||
// (budget commit, audit Close, attachment bookkeeping) may wait on
|
||||
// storage after detaching from the caller's — possibly already
|
||||
// cancelled — context. 10s is generous for a single-row UPDATE against
|
||||
// MySQL; longer suggests a hung connection the run goroutine shouldn't
|
||||
// keep waiting on. Both executors derive their cleanup contexts as
|
||||
// context.WithTimeout(context.WithoutCancel(ctx), CleanupContextTimeout).
|
||||
const CleanupContextTimeout = 10 * time.Second
|
||||
|
||||
// Reserved state-react lifecycle event keys, shared so both nouns surface
|
||||
// the same UX shape. Namespaced with double-underscores to make accidental
|
||||
// collision with a tool name near-impossible.
|
||||
const (
|
||||
StateReactStart = "__start__"
|
||||
StateReactEnd = "__end__"
|
||||
StateReactError = "__error__"
|
||||
StateReactBudgetExceeded = "__budget_exceeded__"
|
||||
)
|
||||
|
||||
// MergeCancellation returns a context cancelled when EITHER input is
|
||||
// cancelled, propagating the cancellation Cause from whichever fired. Used
|
||||
// by the lane preemption path (the lane's per-job ctx.Cause flows into the
|
||||
// run context) and by the runtime-detach path (process shutdown still
|
||||
// reaches a run whose deadline was reset after a lane wait). Always call
|
||||
// the returned cancel to release the watcher goroutine; it is also invoked
|
||||
// once when either input fires.
|
||||
func MergeCancellation(parent, secondary context.Context) (context.Context, context.CancelFunc) {
|
||||
merged, cancel := context.WithCancelCause(parent)
|
||||
go func() {
|
||||
select {
|
||||
case <-merged.Done():
|
||||
return
|
||||
case <-secondary.Done():
|
||||
cancel(context.Cause(secondary))
|
||||
}
|
||||
}()
|
||||
return merged, func() { cancel(nil) }
|
||||
}
|
||||
|
||||
// RunFinalizer is invoked at run finish so per-run tool state (open HTTP
|
||||
// streams, per-run code_exec counters, per-run search budgets) is released
|
||||
// and the process-lifetime maps keyed by run id don't grow unbounded.
|
||||
// Both executors fire their registered finalizers via FireFinalizers.
|
||||
type RunFinalizer interface {
|
||||
FinalizeRun(runID string)
|
||||
}
|
||||
|
||||
// FireFinalizers runs every finalizer for runID, isolating each behind a
|
||||
// panic-recover so one buggy finalizer can't take down the run goroutine
|
||||
// or skip the others. Safe to call with a nil/empty slice.
|
||||
func FireFinalizers(fs []RunFinalizer, runID string) {
|
||||
for _, f := range fs {
|
||||
if f == nil {
|
||||
continue
|
||||
}
|
||||
func() {
|
||||
defer func() {
|
||||
if r := recover(); r != nil {
|
||||
slog.Error("runengine: run finalizer panicked",
|
||||
"run_id", runID, "panic", r)
|
||||
}
|
||||
}()
|
||||
f.FinalizeRun(runID)
|
||||
}()
|
||||
}
|
||||
}
|
||||
|
||||
// RunTally is the narrow live-progress source the RunStateAccessor reads —
|
||||
// the running token and tool-call counts for the in-flight run. The audit
|
||||
// battery's writer satisfies it; this interface is how the run kernel reads
|
||||
// live tallies without importing the audit package (the inversion of mort's
|
||||
// direct *skillaudit.Writer dependency).
|
||||
type RunTally interface {
|
||||
// TokenStats returns the running input, output, and thinking token totals.
|
||||
TokenStats() (in, out, thinking int64)
|
||||
// ToolCallsCount returns the number of tool calls executed so far.
|
||||
ToolCallsCount() int
|
||||
}
|
||||
|
||||
// RunStateAccessor is the per-run live-progress accessor the executor
|
||||
// stamps on Invocation.RunState before building the toolbox, so the
|
||||
// self-status tool can report iteration / tool-calls / tokens / elapsed for
|
||||
// the in-flight run. Construct with NewRunStateAccessor; the executor's step
|
||||
// observer calls SetIteration each loop.
|
||||
type RunStateAccessor struct {
|
||||
tally RunTally
|
||||
iter atomic.Int32
|
||||
maxIter int
|
||||
maxCalls int
|
||||
startedAt time.Time
|
||||
}
|
||||
|
||||
// NewRunStateAccessor builds the accessor. writer supplies the live token
|
||||
// + tool-call tallies; maxIter / maxCalls are the reported caps (0 =
|
||||
// uncapped); startedAt anchors the elapsed clock.
|
||||
func NewRunStateAccessor(tally RunTally, maxIter, maxCalls int, startedAt time.Time) *RunStateAccessor {
|
||||
return &RunStateAccessor{
|
||||
tally: tally,
|
||||
maxIter: maxIter,
|
||||
maxCalls: maxCalls,
|
||||
startedAt: startedAt,
|
||||
}
|
||||
}
|
||||
|
||||
// SetIteration records the current agent-loop iteration (called from the
|
||||
// executor's step observer).
|
||||
func (a *RunStateAccessor) SetIteration(iter int) { a.iter.Store(int32(iter)) }
|
||||
|
||||
// RunState satisfies tool.RunStateAccessor.
|
||||
func (a *RunStateAccessor) RunState() tool.RunState {
|
||||
in, out, think := a.tally.TokenStats()
|
||||
return tool.RunState{
|
||||
Iteration: int(a.iter.Load()),
|
||||
MaxIterations: a.maxIter,
|
||||
ToolCalls: a.tally.ToolCallsCount(),
|
||||
MaxToolCalls: a.maxCalls,
|
||||
InputTokens: in,
|
||||
OutputTokens: out,
|
||||
ThinkingTokens: think,
|
||||
ElapsedSeconds: int(time.Since(a.startedAt).Seconds()),
|
||||
}
|
||||
}
|
||||
+419
@@ -0,0 +1,419 @@
|
||||
package run
|
||||
|
||||
// steps.go — the per-run step emitter and the tool→step presentation
|
||||
// mapping. This is the single place that turns the executor's post-step
|
||||
// observer into ordered tool.Step records: one per tool call, each with a
|
||||
// stable id, an open-vocabulary kind, and a human present-tense summary
|
||||
// that flips running→complete/error.
|
||||
//
|
||||
// One source feeds two consumers (mirroring the OnEvent/OnToolEvent/
|
||||
// PostRunResult pattern): the live tool.Invocation.OnStep callback
|
||||
// (nil-safe) AND snapshot(), which the executor copies onto Result.Steps.
|
||||
// Because the Result accumulation does not depend on OnStep being set,
|
||||
// every surface — chat (JSON + SSE), Discord, cron, sub-agents — carries
|
||||
// steps; OnStep is needed only for live streaming.
|
||||
//
|
||||
// LIMITATION (current): majordomo exposes only a POST-step observer, so
|
||||
// the executor calls toolStart+toolEnd back-to-back after each tool has
|
||||
// already run. Steps are therefore recorded faithfully, but step.StartedAt
|
||||
// ≈ EndedAt and the intermediate "running" phase is never observable to a
|
||||
// live OnStep consumer. A pre-dispatch hook (wrapping each tool's handler
|
||||
// to emit toolStart before execution, like mort's state-react decorator)
|
||||
// is a follow-up that would restore real start timing + the running phase.
|
||||
// The emitter already supports that two-call shape — toolStart and toolEnd
|
||||
// are separate methods — so wiring it later is additive.
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"net/url"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"gitea.stevedudenhoeffer.com/steve/majordomo/llm"
|
||||
|
||||
"gitea.stevedudenhoeffer.com/steve/executus/tool"
|
||||
)
|
||||
|
||||
// stepSummaryMaxLen caps the human summary length (section G size cap).
|
||||
// Detail is unused in v1 (no live detail source while replies are
|
||||
// generated blocking) so there is no Detail cap yet.
|
||||
const stepSummaryMaxLen = 200
|
||||
|
||||
// stepEmitter accumulates ordered steps for one run and fires the live
|
||||
// OnStep callback.
|
||||
//
|
||||
// Concurrency: touched ONLY from the agent-loop goroutine — the executor's
|
||||
// stepObserver (and, once a pre-dispatch hook is wired, that hook) run
|
||||
// there; majordomo executes a step's tool calls sequentially, and
|
||||
// sub-agents build their own Invocation so they never reach this
|
||||
// emitter. Same single-goroutine contract as the audit Writer and the
|
||||
// critic ProgressRecorder — no internal lock.
|
||||
type stepEmitter struct {
|
||||
onStep func(ctx context.Context, ev tool.StepEvent)
|
||||
now func() time.Time
|
||||
|
||||
seq int
|
||||
steps []tool.Step // ordered; the snapshot for Result.Steps
|
||||
byID map[string]int // step id -> index into steps
|
||||
pending map[string][]string // correlation key -> queued running ids (FIFO)
|
||||
}
|
||||
|
||||
// newStepEmitter returns an emitter that forwards to onStep (nil-safe).
|
||||
func newStepEmitter(onStep func(ctx context.Context, ev tool.StepEvent)) *stepEmitter {
|
||||
return &stepEmitter{
|
||||
onStep: onStep,
|
||||
now: time.Now,
|
||||
byID: map[string]int{},
|
||||
pending: map[string][]string{},
|
||||
}
|
||||
}
|
||||
|
||||
// corrKey correlates a "start" (name + raw args, no call id available at
|
||||
// the pre-dispatch hook) with its later "end" (the stepObserver has the
|
||||
// full call incl. id + the same raw args).
|
||||
func corrKey(name string, args json.RawMessage) string {
|
||||
return name + "\x00" + string(args)
|
||||
}
|
||||
|
||||
// toolStart records + emits the "start" of a tool call. Called from the
|
||||
// pre-dispatch hookToolbox closure, before the tool runs.
|
||||
func (e *stepEmitter) toolStart(ctx context.Context, name string, args json.RawMessage) {
|
||||
if e == nil {
|
||||
return
|
||||
}
|
||||
step := e.newStep(name, args)
|
||||
key := corrKey(name, args)
|
||||
e.pending[key] = append(e.pending[key], step.ID)
|
||||
e.fire(ctx, "start", step)
|
||||
}
|
||||
|
||||
// toolEnd records + emits the terminal "end" of a tool call. Called from
|
||||
// the stepObserver for each completed tool call. If no matching start was
|
||||
// seen (e.g. a tool with a nil handler the pre-dispatch hook skipped), a
|
||||
// start is synthesized so the step still appears.
|
||||
func (e *stepEmitter) toolEnd(ctx context.Context, call llm.ToolCall, result string, isError bool) {
|
||||
if e == nil {
|
||||
return
|
||||
}
|
||||
id := e.matchPending(call.Name, call.Arguments)
|
||||
if id == "" {
|
||||
id = e.newStep(call.Name, call.Arguments).ID
|
||||
}
|
||||
idx, ok := e.byID[id]
|
||||
if !ok {
|
||||
return
|
||||
}
|
||||
step := &e.steps[idx]
|
||||
end := e.now()
|
||||
step.EndedAt = &end
|
||||
if isError {
|
||||
step.Status = "error"
|
||||
} else {
|
||||
step.Status = "complete"
|
||||
}
|
||||
if s := summaryForEnd(call.Name, call.Arguments, result, isError); s != "" {
|
||||
step.Summary = s
|
||||
}
|
||||
e.fire(ctx, "end", *step)
|
||||
}
|
||||
|
||||
// newStep mints + appends a running step and returns it (by value).
|
||||
func (e *stepEmitter) newStep(name string, args json.RawMessage) tool.Step {
|
||||
e.seq++
|
||||
step := tool.Step{
|
||||
ID: fmt.Sprintf("s%d", e.seq),
|
||||
Kind: kindForTool(name),
|
||||
Title: name,
|
||||
Summary: summaryForStart(name, args),
|
||||
Status: "running",
|
||||
StartedAt: e.now(),
|
||||
}
|
||||
e.byID[step.ID] = len(e.steps)
|
||||
e.steps = append(e.steps, step)
|
||||
return step
|
||||
}
|
||||
|
||||
// matchPending pops the oldest running step id for (name, args). Falls back to
|
||||
// the OLDEST still-running step of the same tool name when the args don't
|
||||
// byte-match between start and end (e.g. JSON key reordering). FIFO on the
|
||||
// fallback too, consistent with the per-key queue pop above — closing the
|
||||
// oldest avoids mis-correlating concurrent same-named calls. Returns "" on no
|
||||
// match.
|
||||
func (e *stepEmitter) matchPending(name string, args json.RawMessage) string {
|
||||
key := corrKey(name, args)
|
||||
if q := e.pending[key]; len(q) > 0 {
|
||||
id := q[0]
|
||||
if len(q) == 1 {
|
||||
delete(e.pending, key)
|
||||
} else {
|
||||
e.pending[key] = q[1:]
|
||||
}
|
||||
return id
|
||||
}
|
||||
for i := 0; i < len(e.steps); i++ {
|
||||
if e.steps[i].Title == name && e.steps[i].Status == "running" {
|
||||
return e.steps[i].ID
|
||||
}
|
||||
}
|
||||
return ""
|
||||
}
|
||||
|
||||
func (e *stepEmitter) fire(ctx context.Context, phase string, step tool.Step) {
|
||||
if e.onStep == nil {
|
||||
return
|
||||
}
|
||||
e.onStep(ctx, tool.StepEvent{Phase: phase, Step: step})
|
||||
}
|
||||
|
||||
// snapshot returns a copy of the ordered, deduplicated step set for the
|
||||
// run Result. A step still "running" at run end (e.g. the run was
|
||||
// cancelled mid-tool-call) is reported as-is.
|
||||
func (e *stepEmitter) snapshot() []tool.Step {
|
||||
if e == nil || len(e.steps) == 0 {
|
||||
return nil
|
||||
}
|
||||
out := make([]tool.Step, len(e.steps))
|
||||
copy(out, e.steps)
|
||||
return out
|
||||
}
|
||||
|
||||
// kindForTool maps a tool name to an open-vocabulary step kind. Unknown
|
||||
// tools fall back to "tool" — never an error, just a generic step (the
|
||||
// client maps unknown kinds to a default icon). Loosely tracks the
|
||||
// catalog in pkg/skilltools/CLAUDE.md.
|
||||
func kindForTool(name string) string {
|
||||
switch name {
|
||||
case "web_search", "search_reddit", "wikipedia_summary":
|
||||
return "search"
|
||||
case "read_page", "read_pdf", "read_reddit", "read_video", "verify_url",
|
||||
"summary_summarise", "summarize", "file_get_text", "file_get_metadata",
|
||||
"http_get", "http_post", "http_get_stream", "http_stream_read":
|
||||
return "read"
|
||||
case "code_exec", "calculate":
|
||||
return "code"
|
||||
case "file_save", "file_get", "file_list", "file_delete", "file_search":
|
||||
return "file"
|
||||
case "kv_get", "kv_set", "kv_list", "kv_delete",
|
||||
"remember", "recall", "chatbot_get_memories":
|
||||
return "memory"
|
||||
case "query", "query_research", "deepresearch", "animate",
|
||||
"agent_invoke", "agent_invoke_parallel", "agent_spawn",
|
||||
"agent_spawn_parallel", "skill_invoke", "skill_invoke_parallel":
|
||||
return "delegate"
|
||||
case "think":
|
||||
return "thinking"
|
||||
default:
|
||||
switch {
|
||||
case strings.HasPrefix(name, "image") || strings.Contains(name, "draw"):
|
||||
return "image"
|
||||
default:
|
||||
return "tool"
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// summaryForStart builds the human present-tense running summary. It
|
||||
// derives specifics from safe arg fields only; secret-bearing tools
|
||||
// (mcp_call, email_send, http_*) are summarized without echoing args.
|
||||
func summaryForStart(name string, args json.RawMessage) string {
|
||||
var s string
|
||||
switch name {
|
||||
case "web_search":
|
||||
if q := argString(args, "query", "q"); q != "" {
|
||||
s = fmt.Sprintf("Searching the web for %q", q)
|
||||
} else {
|
||||
s = "Searching the web"
|
||||
}
|
||||
case "search_reddit":
|
||||
if q := argString(args, "query", "q"); q != "" {
|
||||
s = fmt.Sprintf("Searching Reddit for %q", q)
|
||||
} else {
|
||||
s = "Searching Reddit"
|
||||
}
|
||||
case "wikipedia_summary":
|
||||
if q := argString(args, "query", "title"); q != "" {
|
||||
s = fmt.Sprintf("Looking up %q on Wikipedia", q)
|
||||
} else {
|
||||
s = "Looking up Wikipedia"
|
||||
}
|
||||
case "read_page", "read_pdf", "read_reddit", "read_video", "verify_url":
|
||||
if u := argString(args, "url", "post", "page"); u != "" {
|
||||
s = "Reading " + hostOf(u)
|
||||
} else {
|
||||
s = "Reading a page"
|
||||
}
|
||||
case "http_get", "http_post", "http_get_stream":
|
||||
// Show host only — a full URL can embed credentials/tokens.
|
||||
if u := argString(args, "url"); u != "" {
|
||||
s = "Fetching " + hostOf(u)
|
||||
} else {
|
||||
s = "Making an HTTP request"
|
||||
}
|
||||
case "summary_summarise", "summarize":
|
||||
s = "Summarizing text"
|
||||
case "translate":
|
||||
if lang := argString(args, "target_lang", "target_language", "lang"); lang != "" {
|
||||
s = "Translating to " + lang
|
||||
} else {
|
||||
s = "Translating text"
|
||||
}
|
||||
case "code_exec":
|
||||
s = "Running code"
|
||||
case "calculate":
|
||||
if q := argString(args, "query", "expression", "expr"); q != "" {
|
||||
s = "Calculating " + truncateStep(q, 60)
|
||||
} else {
|
||||
s = "Calculating"
|
||||
}
|
||||
case "remember":
|
||||
// Never echo the stored value.
|
||||
s = "Saving a memory"
|
||||
case "recall", "chatbot_get_memories":
|
||||
s = "Recalling memories"
|
||||
case "kv_get", "kv_list":
|
||||
s = "Reading saved data"
|
||||
case "kv_set":
|
||||
s = "Saving data"
|
||||
case "kv_delete":
|
||||
s = "Deleting saved data"
|
||||
case "file_save":
|
||||
if n := argString(args, "name", "filename"); n != "" {
|
||||
s = "Saving file " + truncateStep(n, 60)
|
||||
} else {
|
||||
s = "Saving a file"
|
||||
}
|
||||
case "file_get", "file_get_text", "file_get_metadata":
|
||||
s = "Reading a file"
|
||||
case "file_list", "file_search":
|
||||
s = "Listing files"
|
||||
case "query", "query_research":
|
||||
if q := argString(args, "query", "question", "prompt", "task"); q != "" {
|
||||
s = "Researching " + truncateStep(q, 80)
|
||||
} else {
|
||||
s = "Researching"
|
||||
}
|
||||
case "deepresearch":
|
||||
s = "Running deep research"
|
||||
case "animate":
|
||||
s = "Generating an animation"
|
||||
case "agent_invoke", "agent_spawn":
|
||||
if a := argString(args, "agent", "agent_name", "name"); a != "" {
|
||||
s = "Delegating to " + a
|
||||
} else {
|
||||
s = "Delegating to a sub-agent"
|
||||
}
|
||||
case "agent_invoke_parallel", "agent_spawn_parallel":
|
||||
s = "Delegating to sub-agents"
|
||||
case "skill_invoke":
|
||||
if sk := argString(args, "skill_name", "skill", "name"); sk != "" {
|
||||
s = "Running skill " + sk
|
||||
} else {
|
||||
s = "Running a skill"
|
||||
}
|
||||
case "skill_invoke_parallel":
|
||||
s = "Running skills in parallel"
|
||||
case "think":
|
||||
s = "Thinking"
|
||||
case "mcp_call":
|
||||
// Redact: MCP args frequently carry secrets. Name server/tool only.
|
||||
srv, tl := argString(args, "server"), argString(args, "tool")
|
||||
switch {
|
||||
case srv != "" && tl != "":
|
||||
s = fmt.Sprintf("Calling %s/%s", srv, tl)
|
||||
case srv != "":
|
||||
s = "Calling " + srv
|
||||
default:
|
||||
s = "Calling an MCP tool"
|
||||
}
|
||||
case "email_send":
|
||||
// Redact recipients + body.
|
||||
s = "Sending an email"
|
||||
default:
|
||||
s = "Using " + name
|
||||
}
|
||||
return truncateStep(s, stepSummaryMaxLen)
|
||||
}
|
||||
|
||||
// summaryForEnd optionally upgrades the summary to a cheap result phrase.
|
||||
// Returns "" to keep the running summary (the caller then just flips the
|
||||
// status). Never returns a phrase derived from raw result bytes.
|
||||
func summaryForEnd(name string, _ json.RawMessage, result string, isError bool) string {
|
||||
if isError {
|
||||
return ""
|
||||
}
|
||||
switch name {
|
||||
case "web_search", "search_reddit":
|
||||
if n := countResults(result); n >= 0 {
|
||||
return fmt.Sprintf("Found %d result%s", n, plural(n))
|
||||
}
|
||||
}
|
||||
return ""
|
||||
}
|
||||
|
||||
// argString pulls the first present non-empty string field from a tool's
|
||||
// raw JSON args, trying keys in order. Returns "" when none parse.
|
||||
func argString(args json.RawMessage, keys ...string) string {
|
||||
if len(args) == 0 {
|
||||
return ""
|
||||
}
|
||||
var m map[string]any
|
||||
if err := json.Unmarshal(args, &m); err != nil {
|
||||
return ""
|
||||
}
|
||||
for _, k := range keys {
|
||||
if v, ok := m[k]; ok {
|
||||
if s, ok := v.(string); ok && strings.TrimSpace(s) != "" {
|
||||
return strings.TrimSpace(s)
|
||||
}
|
||||
}
|
||||
}
|
||||
return ""
|
||||
}
|
||||
|
||||
// countResults parses a v11-style {"results":[...]} envelope and returns
|
||||
// the count, or -1 when the shape doesn't match.
|
||||
func countResults(result string) int {
|
||||
if strings.TrimSpace(result) == "" {
|
||||
return -1
|
||||
}
|
||||
var env struct {
|
||||
Results []json.RawMessage `json:"results"`
|
||||
}
|
||||
if err := json.Unmarshal([]byte(result), &env); err != nil || env.Results == nil {
|
||||
return -1
|
||||
}
|
||||
return len(env.Results)
|
||||
}
|
||||
|
||||
// hostOf returns the bare host (no leading www.) of a URL, or a short
|
||||
// form of the raw string when it doesn't parse as a URL.
|
||||
func hostOf(raw string) string {
|
||||
if u, err := url.Parse(raw); err == nil && u.Host != "" {
|
||||
return strings.TrimPrefix(u.Host, "www.")
|
||||
}
|
||||
return truncateStep(raw, 60)
|
||||
}
|
||||
|
||||
// truncateStep rune-safely caps s to max, appending an ellipsis when cut.
|
||||
func truncateStep(s string, max int) string {
|
||||
if max <= 0 {
|
||||
return ""
|
||||
}
|
||||
r := []rune(s)
|
||||
if len(r) <= max {
|
||||
return s
|
||||
}
|
||||
if max == 1 {
|
||||
return string(r[:1])
|
||||
}
|
||||
return string(r[:max-1]) + "…"
|
||||
}
|
||||
|
||||
func plural(n int) string {
|
||||
if n == 1 {
|
||||
return ""
|
||||
}
|
||||
return "s"
|
||||
}
|
||||
@@ -0,0 +1,84 @@
|
||||
package run
|
||||
|
||||
import (
|
||||
"context"
|
||||
"strings"
|
||||
"sync"
|
||||
|
||||
llm "gitea.stevedudenhoeffer.com/steve/majordomo/llm"
|
||||
)
|
||||
|
||||
// SubmitCapture records the output a run's `submit` tool received.
|
||||
//
|
||||
// Why this exists: legacy agentkit injected a synthetic `submit` tool and
|
||||
// ended the loop when it fired; years of mort system prompts (agent
|
||||
// YAMLs, skill manifests, the executors' platform headers) teach the
|
||||
// model to "call submit with your final answer". majordomo's agent loop
|
||||
// has no submit concept — it ends when the model replies WITHOUT tool
|
||||
// calls. Dropping submit cold would make every prompt-trained model
|
||||
// burn turns on "unknown tool \"submit\"" errors.
|
||||
//
|
||||
// The compatibility shape: the executors add NewSubmitTool's tool to
|
||||
// every run's toolset (unless the palette already defines a `submit`).
|
||||
// The handler records the FIRST submitted answer and tells the model
|
||||
// the answer was accepted so its next turn is a bare reply (which ends
|
||||
// the loop naturally). After the run, the executor consults
|
||||
// Output(loopOutput, runErr): a captured submission wins over an empty
|
||||
// or budget-exhausted ending, so a model that submits on its final
|
||||
// allowed step still produces its answer instead of ErrMaxSteps.
|
||||
type SubmitCapture struct {
|
||||
mu sync.Mutex
|
||||
output string
|
||||
called bool
|
||||
}
|
||||
|
||||
// Record stores the first submitted answer; later calls are ignored
|
||||
// (matching legacy agentkit's "multiple calls keep the first" contract).
|
||||
func (c *SubmitCapture) Record(output string) {
|
||||
c.mu.Lock()
|
||||
defer c.mu.Unlock()
|
||||
if c.called {
|
||||
return
|
||||
}
|
||||
c.called = true
|
||||
c.output = output
|
||||
}
|
||||
|
||||
// Submitted returns the captured answer and whether submit fired.
|
||||
func (c *SubmitCapture) Submitted() (string, bool) {
|
||||
c.mu.Lock()
|
||||
defer c.mu.Unlock()
|
||||
return c.output, c.called
|
||||
}
|
||||
|
||||
// Output resolves the run's final output: the submitted answer when the
|
||||
// model called submit (parity with legacy agentkit, where submit's argument
|
||||
// WAS the run output), otherwise the loop's own final text. resolvedErr
|
||||
// is nil when a submission exists — a run that submitted its answer and
|
||||
// then ran out of steps (or timed out composing the courtesy
|
||||
// confirmation turn) is a SUCCESS, not an error.
|
||||
func (c *SubmitCapture) Output(loopOutput string, runErr error) (output string, resolvedErr error) {
|
||||
if out, ok := c.Submitted(); ok {
|
||||
return out, nil
|
||||
}
|
||||
return loopOutput, runErr
|
||||
}
|
||||
|
||||
// submitArgs mirrors legacy agentkit's synthetic submit tool schema so
|
||||
// models prompted under the old contract emit compatible calls.
|
||||
type submitArgs struct {
|
||||
Output string `json:"output" description:"The final answer, summary, or output for this task."`
|
||||
}
|
||||
|
||||
// NewSubmitTool builds the compatibility `submit` tool bound to the
|
||||
// given capture. Both executors (skill + agent) install one per run.
|
||||
func NewSubmitTool(capture *SubmitCapture) llm.Tool {
|
||||
return llm.DefineTool[submitArgs](
|
||||
"submit",
|
||||
"Submit your final answer or output to end this task. Call exactly once when you are done.",
|
||||
func(_ context.Context, args submitArgs) (any, error) {
|
||||
capture.Record(strings.TrimSpace(args.Output))
|
||||
return "Final answer recorded. Do not call any more tools; reply now with a brief closing message.", nil
|
||||
},
|
||||
)
|
||||
}
|
||||
Reference in New Issue
Block a user