P2: run kernel + run.Ports inversion — executus is runnable #2
@@ -1,10 +1,11 @@
|
|||||||
# Gadfly — agentic adversarial PR reviewer (https://gitea.stevedudenhoeffer.com/steve/gadfly).
|
# Gadfly — agentic adversarial PR reviewer (https://gitea.stevedudenhoeffer.com/steve/gadfly).
|
||||||
#
|
#
|
||||||
# Runs the published Gadfly image (pinned to :v1) as a specialist swarm and posts
|
# Runs the published Gadfly image (pinned to an immutable :sha- tag — act_runner
|
||||||
|
# caches :latest, and this build is what carries foreman provider-type support)
|
||||||
|
# as a specialist swarm and posts
|
||||||
# ONE consolidated review comment as gitea-actions. Advisory only — never blocks a
|
# ONE consolidated review comment as gitea-actions. Advisory only — never blocks a
|
||||||
# merge. This reviews executus PRs (same setup as mort: m1/m5 locals + 2 cloud,
|
# merge. This reviews executus PRs with 3 ollama-cloud models (3-lens suite). Gadfly
|
||||||
# 3-lens suite). Gadfly is a simple system — treat its findings as advisory and
|
# is a simple system — findings are advisory; always double-check before acting.
|
||||||
# double-check before acting.
|
|
||||||
|
|
||||||
name: Adversarial Review (Gadfly)
|
name: Adversarial Review (Gadfly)
|
||||||
|
|
||||||
@@ -40,24 +41,21 @@ jobs:
|
|||||||
|| github.actor == 'fizi'
|
|| github.actor == 'fizi'
|
||||||
|| github.actor == 'dazed'))
|
|| github.actor == 'dazed'))
|
||||||
runs-on: ubuntu-latest
|
runs-on: ubuntu-latest
|
||||||
# Full fleet (2 cloud + 2 local Macs, all running concurrently) reviewing
|
# 3 cloud models, all concurrent, 3-lens suite. ~12 min typical.
|
||||||
# every PR with the 3-lens suite — the slow local lanes dominate wall time.
|
timeout-minutes: 30
|
||||||
timeout-minutes: 90
|
|
||||||
steps:
|
steps:
|
||||||
- uses: docker://gitea.stevedudenhoeffer.com/steve/gadfly:v1
|
- uses: docker://gitea.stevedudenhoeffer.com/steve/gadfly:sha-6e3a83c
|
||||||
env:
|
env:
|
||||||
GITEA_API: ${{ github.server_url }}/api/v1/repos/${{ github.repository }}
|
GITEA_API: ${{ github.server_url }}/api/v1/repos/${{ github.repository }}
|
||||||
GITEA_TOKEN: ${{ secrets.GITEA_TOKEN }}
|
GITEA_TOKEN: ${{ secrets.GITEA_TOKEN }}
|
||||||
OLLAMA_CLOUD_API_KEY: ${{ secrets.OLLAMA_CLOUD_API_KEY }}
|
OLLAMA_CLOUD_API_KEY: ${{ secrets.OLLAMA_CLOUD_API_KEY }}
|
||||||
# Local Ollama boxes (each its own lane, cap 1). NOTE: both Macs must be
|
# executus uses CLOUD MODELS ONLY. The local Macs (m1/m5) were dropped:
|
||||||
# awake/reachable for their reviews to run; if a box is offline, that
|
# on a P2-review measurement they took 26–29 min (with lens timeouts)
|
||||||
# model's comment shows an error and the others still post.
|
# and contributed ZERO real findings — the two cloud models found every
|
||||||
GADFLY_ENDPOINT_M1PRO: "ollama|http://192.168.0.175:11434"
|
# genuine bug in 6–12 min. Cloud-only is faster AND higher-signal.
|
||||||
GADFLY_ENDPOINT_M5MAX: "ollama|http://192.168.0.173:11434"
|
# 3 cloud models, one consolidated comment each, all run in parallel.
|
||||||
# 2 cloud (parallel) + M1 Pro + M5 Max — one consolidated comment each.
|
GADFLY_MODELS: "minimax-m3:cloud,deepseek-v4-flash:cloud,glm-5.2:cloud"
|
||||||
GADFLY_MODELS: "minimax-m3:cloud,deepseek-v4-flash:cloud,m1pro/qwen3:14b,m5max/qwen3.6:35b-mlx"
|
GADFLY_PROVIDER_CONCURRENCY: "ollama-cloud=3"
|
||||||
# cloud runs 2 at once; each Mac one at a time; all three lanes parallel.
|
|
||||||
GADFLY_PROVIDER_CONCURRENCY: "ollama-cloud=2,m1pro=1,m5max=1"
|
|
||||||
# Default => the 3-lens suite (security, correctness, error-handling).
|
# Default => the 3-lens suite (security, correctness, error-handling).
|
||||||
# Set the repo var GADFLY_SPECIALISTS to override (csv / "all" / "auto").
|
# Set the repo var GADFLY_SPECIALISTS to override (csv / "all" / "auto").
|
||||||
GADFLY_SPECIALISTS: ${{ vars.GADFLY_SPECIALISTS || 'security,correctness,error-handling' }}
|
GADFLY_SPECIALISTS: ${{ vars.GADFLY_SPECIALISTS || 'security,correctness,error-handling' }}
|
||||||
|
|||||||
@@ -43,8 +43,13 @@ CORE (majordomo + stdlib):
|
|||||||
fanout/ programmatic N×M swarm [P0 ✓]
|
fanout/ programmatic N×M swarm [P0 ✓]
|
||||||
deliver/ output egress seam (+ Discard/Stdout) [P0 ✓]
|
deliver/ output egress seam (+ Discard/Stdout) [P0 ✓]
|
||||||
identity/ caller identity seams [P0 ✓]
|
identity/ caller identity seams [P0 ✓]
|
||||||
run/ progress bridge now; the executor kernel + [P0 partial]
|
run/ run.Executor is RUNNABLE: model-resolve + [P2 core ✓]
|
||||||
nil-safe Ports + RunnableAgent later [P2]
|
toolbox + majordomo loop + compaction +
|
||||||
|
run-bounding (V10 detached timeout) + step/
|
||||||
|
audit observers + Budget gate; RunnableAgent
|
||||||
|
DTO + nil-safe run.Ports. Follow-ups: wire
|
||||||
|
Critic/Checkpointer/PaletteSource/Delivery,
|
||||||
|
Phases, and the no-tools direct path [P2]
|
||||||
dispatchguard/ loop/depth/fan-out caps [P0 ✓]
|
dispatchguard/ loop/depth/fan-out caps [P0 ✓]
|
||||||
pendingattach/ attachment dedupe [P0 ✓]
|
pendingattach/ attachment dedupe [P0 ✓]
|
||||||
tool/ registry + 3-stage permissions + ssrf [P1 ✓]
|
tool/ registry + 3-stage permissions + ssrf [P1 ✓]
|
||||||
@@ -52,7 +57,7 @@ CORE (majordomo + stdlib):
|
|||||||
(convar->config.Source; UsageSink/TraceSink seams; GenerateWith[T]
|
(convar->config.Source; UsageSink/TraceSink seams; GenerateWith[T]
|
||||||
structured output — no separate structured/ pkg)
|
structured output — no separate structured/ pkg)
|
||||||
llmmeta/ shared meta-LLM helper over model/ [P1 ✓]
|
llmmeta/ shared meta-LLM helper over model/ [P1 ✓]
|
||||||
compact/ context compactor (WithCompactor hook) [P2]
|
compact/ context compactor (WithCompactor hook) [P2 ✓]
|
||||||
tools/{web,net,store,compose,meta,comms} generic tools [P3]
|
tools/{web,net,store,compose,meta,comms} generic tools [P3]
|
||||||
|
|
||||||
BATTERIES (opt-in siblings, each nil-safe + a default):
|
BATTERIES (opt-in siblings, each nil-safe + a default):
|
||||||
|
|||||||
@@ -31,15 +31,23 @@ bot) — mort and gadfly are the first two consumers (heavy and light). See
|
|||||||
|
|
||||||
[mort]: https://gitea.stevedudenhoeffer.com/steve/mort
|
[mort]: https://gitea.stevedudenhoeffer.com/steve/mort
|
||||||
|
|
||||||
**Available today (P0):**
|
**Available today:**
|
||||||
|
|
||||||
|
- `run/` — **executus is runnable.** `run.Executor` ties model resolution, the
|
||||||
|
tool registry, majordomo's agent loop, context compaction, run-bounding, and
|
||||||
|
step/audit instrumentation into one `Run(ctx, RunnableAgent, inv) Result`, with
|
||||||
|
every host concern behind a nil-safe `run.Ports` (Audit/Budget/Critic/
|
||||||
|
Checkpointer/PaletteSource/Delivery). See `examples/minimal`.
|
||||||
|
- `model/` — config-driven tier resolution + failover over majordomo, with
|
||||||
|
pluggable `UsageSink`/`TraceSink` and `GenerateWith[T]` structured output.
|
||||||
|
- `tool/` — the tool registry + 3-stage permission model + SSRF guard.
|
||||||
|
- `compact/` — the per-run context compactor.
|
||||||
- `lane/` — bounded worker pool with fair-share queueing (run- and
|
- `lane/` — bounded worker pool with fair-share queueing (run- and
|
||||||
provider-concurrency).
|
provider-concurrency).
|
||||||
- `fanout/` — programmatic N×M swarm with bounded global + per-key concurrency.
|
- `fanout/` — programmatic N×M swarm with bounded global + per-key concurrency.
|
||||||
- `config/` — the host config seam (`Source`) with an env-var default.
|
- `config/`, `deliver/`, `identity/` — host seams (config / output / identity),
|
||||||
- `deliver/` — the output-egress seam with `Discard`/`Stdout` defaults.
|
each with a shipped default.
|
||||||
- `identity/` — caller-identity seams (`AdminPolicy`, `MemberResolver`).
|
- `dispatchguard/`, `pendingattach/` — run-safety primitives.
|
||||||
- `dispatchguard/`, `pendingattach/`, `run/progress.go` — run-safety primitives.
|
|
||||||
|
|
||||||
## Design
|
## Design
|
||||||
|
|
||||||
|
|||||||
@@ -0,0 +1,369 @@
|
|||||||
|
// V15.2 context compactor (re-based on majordomo).
|
||||||
|
//
|
||||||
|
// Why: the agent loop accumulates tool results indefinitely. A
|
||||||
|
// research-heavy run with many web_search / read_page / http_get
|
||||||
|
// results easily crosses 200K tokens and trips the model's HTTP-400
|
||||||
|
// "prompt too long" rejection mid-run (observed at 410K tokens on
|
||||||
|
// qwen3-coder:480b which has a 262K cap). majordomo's agent loop calls
|
||||||
|
// the compactor with the full message slice before every model call;
|
||||||
|
// the compactor returns a shorter slice that preserves the system
|
||||||
|
// prompt + recent messages, with the middle range replaced by a
|
||||||
|
// synthetic summary.
|
||||||
|
//
|
||||||
|
// Strategy (unchanged from the agentkit era):
|
||||||
|
// - Keep any leading system message verbatim. (Under majordomo the
|
||||||
|
// system prompt normally travels in Request.System, not in the
|
||||||
|
// message slice, so this is defensive.)
|
||||||
|
// - Keep the last KeepRecent messages verbatim. This ensures the
|
||||||
|
// agent has fresh tool state to act on; compacting too aggressively
|
||||||
|
// would strip the in-flight context it needs to make the next
|
||||||
|
// decision.
|
||||||
|
// - Compress the middle range via a single fast-tier LLM call that
|
||||||
|
// receives the middle messages as raw text and produces a paragraph
|
||||||
|
// summary (URLs visited, key findings, file_ids created, what the
|
||||||
|
// agent is trying to accomplish).
|
||||||
|
// - Replace the middle range with one synthetic user-role message
|
||||||
|
// containing the summary. (user-role chosen because tool-result-role
|
||||||
|
// would be ambiguous without a matching tool_call_id.)
|
||||||
|
//
|
||||||
|
// What moved in the majordomo conversion:
|
||||||
|
// - The token-threshold gate lives HERE now. agentkit estimated
|
||||||
|
// tokens and only invoked the compactor past a configured
|
||||||
|
// threshold; majordomo's hook fires before every model call, so
|
||||||
|
// the threshold check (estimateTokens vs the per-run threshold the
|
||||||
|
// executor computes from the model's context limit) is the
|
||||||
|
// compactor's first step.
|
||||||
|
// - The compactor is per-run STATEFUL: majordomo does not replace
|
||||||
|
// the loop's internal transcript with the compacted slice (the
|
||||||
|
// hook shapes only what is SENT), so without memory the middle
|
||||||
|
// would be re-summarised from scratch on every step past the
|
||||||
|
// threshold. The state remembers how far the transcript has been
|
||||||
|
// folded into the running summary and folds the previous summary
|
||||||
|
// into the next one instead of re-paying for it.
|
||||||
|
//
|
||||||
|
// Failure path: any error (LLM unavailable, malformed response, etc.)
|
||||||
|
// is returned to the agent loop, which treats compactor errors as
|
||||||
|
// non-fatal and sends the original slice — if the next model call hits
|
||||||
|
// the provider's limit, the existing HTTP-400 error path takes over.
|
||||||
|
|
||||||
|
package compact
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"errors"
|
||||||
|
"fmt"
|
||||||
|
"strings"
|
||||||
|
"sync"
|
||||||
|
|
||||||
|
"gitea.stevedudenhoeffer.com/steve/majordomo/llm"
|
||||||
|
)
|
||||||
|
|
||||||
|
// ModelResolver resolves a tier/spec to a usable llm.Model (and an enriched
|
||||||
|
// context for usage attribution). model.ParseModelForContext satisfies it.
|
||||||
|
type ModelResolver func(ctx context.Context, tier string) (context.Context, llm.Model, error)
|
||||||
|
|
||||||
|
// Compactor is the per-run compaction hook handed to the agent loop
|
||||||
|
// (matches the signature agent.WithCompactor expects).
|
||||||
|
type Compactor = func(ctx context.Context, msgs []llm.Message) ([]llm.Message, error)
|
||||||
|
|
||||||
|
// CompactionEvent describes one fired compaction so the executor can
|
||||||
|
// log it to skill_run_logs ("compaction_fired").
|
||||||
|
type CompactionEvent struct {
|
||||||
|
// MessagesBefore/After count the messages that would have been
|
||||||
|
// sent without/with this compaction.
|
||||||
|
MessagesBefore int
|
||||||
|
MessagesAfter int
|
||||||
|
// TokensBefore/After are the estimateTokens values for the same
|
||||||
|
// two slices.
|
||||||
|
TokensBefore int
|
||||||
|
TokensAfter int
|
||||||
|
}
|
||||||
|
|
||||||
|
// CompactorFactory mints a fresh per-run Compactor bound to a token
|
||||||
|
// threshold. onFire (nil-safe) observes every compaction that actually
|
||||||
|
// fires. A non-positive threshold yields a pass-through compactor.
|
||||||
|
type CompactorFactory func(thresholdTokens int, onFire func(CompactionEvent)) Compactor
|
||||||
|
|
||||||
|
// CompactorConfig controls the v15.2 compactor's behaviour. Construct
|
||||||
|
// in mort.go from convars + the executor's ModelResolver.
|
||||||
|
type CompactorConfig struct {
|
||||||
|
// Models resolves a model spec (typically "fast" or a specific tier)
|
||||||
|
// to an llm.Model. Required; nil disables compaction.
|
||||||
|
Models ModelResolver
|
||||||
|
|
||||||
|
// SummarizerTier is the model tier used for the compression LLM call.
|
||||||
|
// Production default "fast"; an admin may set "haiku" or a specific
|
||||||
|
// model spec. Empty falls back to "fast".
|
||||||
|
SummarizerTier string
|
||||||
|
|
||||||
|
// KeepRecent is the number of trailing messages preserved verbatim.
|
||||||
|
// Default 8. Lower values compact more aggressively (the next loop
|
||||||
|
// iteration sees less recent context); higher values keep more.
|
||||||
|
KeepRecent int
|
||||||
|
|
||||||
|
// SummaryWordCap bounds the LLM-generated summary length. Default
|
||||||
|
// 200 words ≈ 800 chars ≈ 200 tokens — small enough that the
|
||||||
|
// compaction always shrinks the slice meaningfully.
|
||||||
|
SummaryWordCap int
|
||||||
|
}
|
||||||
|
|
||||||
|
// NewCompactor returns a CompactorFactory implementing the middle-range
|
||||||
|
// summarisation strategy. nil cfg.Models returns a factory of no-op
|
||||||
|
// compactors that always return the input unchanged (degrades to
|
||||||
|
// v15.1 behaviour).
|
||||||
|
func NewCompactor(cfg CompactorConfig) CompactorFactory {
|
||||||
|
if cfg.Models == nil {
|
||||||
|
return func(int, func(CompactionEvent)) Compactor {
|
||||||
|
return func(_ context.Context, msgs []llm.Message) ([]llm.Message, error) {
|
||||||
|
return msgs, nil
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if cfg.SummarizerTier == "" {
|
||||||
|
cfg.SummarizerTier = "fast"
|
||||||
|
}
|
||||||
|
if cfg.KeepRecent <= 0 {
|
||||||
|
cfg.KeepRecent = 8
|
||||||
|
}
|
||||||
|
if cfg.SummaryWordCap <= 0 {
|
||||||
|
cfg.SummaryWordCap = 200
|
||||||
|
}
|
||||||
|
return func(threshold int, onFire func(CompactionEvent)) Compactor {
|
||||||
|
st := &compactionState{}
|
||||||
|
return func(ctx context.Context, msgs []llm.Message) ([]llm.Message, error) {
|
||||||
|
return compactIfNeeded(ctx, cfg, st, threshold, onFire, msgs)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// compactionState is the per-run fold memory: msgs[:prefixEnd]
|
||||||
|
// (excluding a leading system message) are represented by summaryText
|
||||||
|
// in the rendered slice. Guarded by mu for safety although the agent
|
||||||
|
// loop invokes the hook from a single goroutine.
|
||||||
|
type compactionState struct {
|
||||||
|
mu sync.Mutex
|
||||||
|
prefixEnd int
|
||||||
|
summaryText string
|
||||||
|
}
|
||||||
|
|
||||||
|
// compactIfNeeded is the workhorse: render the transcript with any
|
||||||
|
// existing fold applied, check the threshold, and fold more of the
|
||||||
|
// middle into the summary when the rendered size still exceeds it.
|
||||||
|
func compactIfNeeded(ctx context.Context, cfg CompactorConfig, st *compactionState,
|
||||||
|
threshold int, onFire func(CompactionEvent), msgs []llm.Message) ([]llm.Message, error) {
|
||||||
|
st.mu.Lock()
|
||||||
|
defer st.mu.Unlock()
|
||||||
|
|
||||||
|
rendered := renderCompacted(st, msgs)
|
||||||
|
if threshold <= 0 {
|
||||||
|
return rendered, nil
|
||||||
|
}
|
||||||
|
tokensBefore := estimateTokens(rendered)
|
||||||
|
if tokensBefore < threshold {
|
||||||
|
return rendered, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Determine the new middle range to fold: everything between what
|
||||||
|
// is already summarised (or the optional leading system message)
|
||||||
|
// and the KeepRecent tail.
|
||||||
|
startMiddle := st.prefixEnd
|
||||||
|
if startMiddle == 0 && len(msgs) > 0 && msgs[0].Role == llm.RoleSystem {
|
||||||
|
startMiddle = 1
|
||||||
|
}
|
||||||
|
endMiddle := len(msgs) - cfg.KeepRecent
|
||||||
|
if endMiddle <= startMiddle {
|
||||||
|
// Nothing new to fold (the tail alone exceeds the threshold).
|
||||||
|
// Return the rendered slice; the model call may still succeed.
|
||||||
|
return rendered, nil
|
||||||
|
}
|
||||||
|
middle := msgs[startMiddle:endMiddle]
|
||||||
|
|
||||||
|
summary, err := summariseMiddle(ctx, cfg, st.summaryText, middle)
|
||||||
|
if err != nil {
|
||||||
|
// Non-fatal upstream: the agent loop sends the ORIGINAL slice. Return
|
||||||
|
// msgs, not `rendered` — on a second+ compaction `rendered` already
|
||||||
|
// carries a prior synthetic summary, which is not the documented
|
||||||
|
// "original slice" the loop expects on a compactor error.
|
||||||
|
return msgs, fmt.Errorf("compactor: summarise middle: %w", err)
|
||||||
|
}
|
||||||
|
st.summaryText = summary
|
||||||
|
st.prefixEnd = endMiddle
|
||||||
|
|
||||||
|
out := renderCompacted(st, msgs)
|
||||||
|
if onFire != nil {
|
||||||
|
onFire(CompactionEvent{
|
||||||
|
MessagesBefore: len(rendered),
|
||||||
|
MessagesAfter: len(out),
|
||||||
|
TokensBefore: tokensBefore,
|
||||||
|
TokensAfter: estimateTokens(out),
|
||||||
|
})
|
||||||
|
}
|
||||||
|
return out, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// renderCompacted applies the fold state to msgs: [optional system] +
|
||||||
|
// [synthetic summary] + msgs[prefixEnd:]. With no fold yet, msgs is
|
||||||
|
// returned unchanged.
|
||||||
|
func renderCompacted(st *compactionState, msgs []llm.Message) []llm.Message {
|
||||||
|
if st.prefixEnd <= 0 || st.prefixEnd > len(msgs) {
|
||||||
|
return msgs
|
||||||
|
}
|
||||||
|
tail := msgs[st.prefixEnd:]
|
||||||
|
out := make([]llm.Message, 0, len(tail)+2)
|
||||||
|
if msgs[0].Role == llm.RoleSystem {
|
||||||
|
out = append(out, msgs[0])
|
||||||
|
}
|
||||||
|
out = append(out, llm.UserText(
|
||||||
|
"[CONTEXT COMPACTED] The earlier portion of this conversation was summarised "+
|
||||||
|
"to fit the model's context window. Summary:\n\n"+st.summaryText+
|
||||||
|
"\n\nResume from the recent messages below."))
|
||||||
|
out = append(out, tail...)
|
||||||
|
return out
|
||||||
|
}
|
||||||
|
|
||||||
|
// summariseMiddle composes a "compress this transcript" prompt and
|
||||||
|
// fires one fast-tier LLM call. prevSummary (may be empty) is the
|
||||||
|
// running summary from earlier compactions; it is folded into the new
|
||||||
|
// summary so prior context is not lost.
|
||||||
|
func summariseMiddle(ctx context.Context, cfg CompactorConfig, prevSummary string, middle []llm.Message) (string, error) {
|
||||||
|
if len(middle) == 0 {
|
||||||
|
return "", errors.New("compactor: empty middle range")
|
||||||
|
}
|
||||||
|
modelCtx, model, err := cfg.Models(ctx, cfg.SummarizerTier)
|
||||||
|
if err != nil {
|
||||||
|
return "", fmt.Errorf("compactor: resolve summarizer model %q: %w", cfg.SummarizerTier, err)
|
||||||
|
}
|
||||||
|
if model == nil {
|
||||||
|
return "", errors.New("compactor: summarizer model resolved to nil")
|
||||||
|
}
|
||||||
|
if modelCtx != nil {
|
||||||
|
ctx = modelCtx
|
||||||
|
}
|
||||||
|
|
||||||
|
var prior string
|
||||||
|
if strings.TrimSpace(prevSummary) != "" {
|
||||||
|
prior = "AN EARLIER PORTION WAS ALREADY SUMMARISED AS:\n" + prevSummary +
|
||||||
|
"\n\nFold that summary into your new one — its facts must survive.\n\n"
|
||||||
|
}
|
||||||
|
transcript := renderTranscript(middle)
|
||||||
|
prompt := fmt.Sprintf(
|
||||||
|
"You are compressing an in-flight agent's conversation transcript so the agent "+
|
||||||
|
"can continue working without blowing its model context. The transcript below is "+
|
||||||
|
"a sequence of tool calls and their results. Produce a single paragraph (under %d words) "+
|
||||||
|
"that captures:\n"+
|
||||||
|
" - WHAT the agent has been trying to accomplish.\n"+
|
||||||
|
" - WHICH URLs were visited / fetched (list inline, comma-separated).\n"+
|
||||||
|
" - KEY findings or decisions (factual results the agent will need later).\n"+
|
||||||
|
" - ANY file_ids or KV keys the agent created — these are persistent state references the agent must keep.\n"+
|
||||||
|
" - ANY errors or dead-ends that the agent should not re-try.\n"+
|
||||||
|
"DO NOT include verbose HTTP headers, tool-call metadata, error stack traces, or repetitive content. "+
|
||||||
|
"DO NOT add commentary or markdown headers. Output prose only.\n\n"+
|
||||||
|
"%sTRANSCRIPT TO COMPRESS:\n%s",
|
||||||
|
cfg.SummaryWordCap,
|
||||||
|
prior,
|
||||||
|
transcript,
|
||||||
|
)
|
||||||
|
|
||||||
|
resp, err := model.Generate(ctx, llm.Request{Messages: []llm.Message{llm.UserText(prompt)}})
|
||||||
|
if err != nil {
|
||||||
|
return "", fmt.Errorf("compactor: summarise LLM call: %w", err)
|
||||||
|
}
|
||||||
|
text := strings.TrimSpace(resp.Text())
|
||||||
|
if text == "" {
|
||||||
|
return "", errors.New("compactor: summarizer returned empty text")
|
||||||
|
}
|
||||||
|
return text, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// estimateTokens is the chars/4 heuristic over a message slice's text,
|
||||||
|
// tool calls, and tool results. Images count a flat ~1K tokens each.
|
||||||
|
// It intentionally matches the coarse estimator the old agentkit loop
|
||||||
|
// used — the 0.7 threshold ratio provides the safety margin.
|
||||||
|
func estimateTokens(msgs []llm.Message) int {
|
||||||
|
chars := 0
|
||||||
|
for _, m := range msgs {
|
||||||
|
for _, p := range m.Parts {
|
||||||
|
switch v := p.(type) {
|
||||||
|
case llm.TextPart:
|
||||||
|
chars += len(v.Text)
|
||||||
|
case llm.ImagePart:
|
||||||
|
chars += 4096
|
||||||
|
default:
|
||||||
|
// llm.Part is a sealed-but-extensible interface (future media
|
||||||
|
// kinds). Count an unknown part conservatively (like an image)
|
||||||
|
// rather than 0, so a transcript of unrecognised content can't
|
||||||
|
// silently slip under the compaction threshold and 400 the
|
||||||
|
// model. Bump this if a large new part kind lands.
|
||||||
|
_ = v
|
||||||
|
chars += 4096
|
||||||
|
}
|
||||||
|
}
|
||||||
|
for _, tc := range m.ToolCalls {
|
||||||
|
chars += len(tc.Name) + len(tc.Arguments)
|
||||||
|
}
|
||||||
|
for _, tr := range m.ToolResults {
|
||||||
|
chars += len(tr.Content)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return chars / 4
|
||||||
|
}
|
||||||
|
|
||||||
|
// transcriptMessageCap bounds individual message bodies at ~2KB so a
|
||||||
|
// single ultra-long tool result can't dominate the prompt sent to the
|
||||||
|
// summarizer.
|
||||||
|
const transcriptMessageCap = 2048
|
||||||
|
|
||||||
|
// secretBearingTools name tools whose ARGUMENTS routinely carry credentials or
|
||||||
|
// message bodies (bearer tokens, API keys, recipients, request bodies). Their
|
||||||
|
// args are dropped before the transcript reaches the summarizer model — which
|
||||||
|
// may be a different provider/tier than the run model — mirroring the redaction
|
||||||
|
// run/steps.go applies to user-facing step summaries. http_* and webhook_* are
|
||||||
|
// matched by prefix below.
|
||||||
|
var secretBearingTools = map[string]bool{
|
||||||
|
"mcp_call": true,
|
||||||
|
"email_send": true,
|
||||||
|
}
|
||||||
|
|
||||||
|
// redactToolArgs returns a summariser-safe rendering of a tool call's args:
|
||||||
|
// "[redacted]" for known secret-bearing tools, the args verbatim otherwise.
|
||||||
|
func redactToolArgs(name, args string) string {
|
||||||
|
if secretBearingTools[name] ||
|
||||||
|
strings.HasPrefix(name, "http_") ||
|
||||||
|
strings.HasPrefix(name, "webhook_") {
|
||||||
|
return "[redacted]"
|
||||||
|
}
|
||||||
|
return args
|
||||||
|
}
|
||||||
|
|
||||||
|
// renderTranscript flattens a message slice to a plain-text transcript
|
||||||
|
// suitable for the summarisation prompt. Tool calls show name + (redacted) args,
|
||||||
|
// tool results show name + body. Empty fields are skipped.
|
||||||
|
//
|
||||||
|
// NOTE: tool-RESULT bodies are forwarded to the summarizer (the summary needs
|
||||||
|
// the findings). A host whose tool results may contain secrets and whose
|
||||||
|
// summarizer tier resolves to an untrusted provider should ensure that tier is
|
||||||
|
// trusted, or pre-sanitise results before they reach the agent loop.
|
||||||
|
func renderTranscript(msgs []llm.Message) string {
|
||||||
|
var sb strings.Builder
|
||||||
|
for i, m := range msgs {
|
||||||
|
fmt.Fprintf(&sb, "---\n[%d] role=%s\n", i+1, m.Role)
|
||||||
|
if text := m.Text(); text != "" {
|
||||||
|
sb.WriteString(truncate(text, transcriptMessageCap))
|
||||||
|
sb.WriteString("\n")
|
||||||
|
}
|
||||||
|
for _, tc := range m.ToolCalls {
|
||||||
|
fmt.Fprintf(&sb, "tool_call name=%s args=%s\n", tc.Name, truncate(redactToolArgs(tc.Name, string(tc.Arguments)), transcriptMessageCap))
|
||||||
|
}
|
||||||
|
for _, tr := range m.ToolResults {
|
||||||
|
fmt.Fprintf(&sb, "tool_result name=%s body=%s\n", tr.Name, truncate(tr.Content, transcriptMessageCap))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return sb.String()
|
||||||
|
}
|
||||||
|
|
||||||
|
func truncate(s string, n int) string {
|
||||||
|
if len(s) <= n {
|
||||||
|
return s
|
||||||
|
}
|
||||||
|
return s[:n] + "...(truncated)"
|
||||||
|
}
|
||||||
+35
-13
@@ -1,27 +1,49 @@
|
|||||||
// Command minimal demonstrates executus's standalone core primitives available
|
// Command minimal is executus's "hello, agentic world": wire a model resolver,
|
||||||
// today (P0): the config seam + bounded fan-out. The full zero-config "agentic
|
// a tool registry, and the run executor, then run an agent. With no batteries
|
||||||
// in ~12 lines" example arrives once the model, tool, and run packages land
|
// (Audit/Budget/Critic/Checkpointer/Palette/Delivery all nil) this is a
|
||||||
// (P1–P3).
|
// bounded, in-memory run — the light-host shape (gadfly's case).
|
||||||
|
//
|
||||||
|
// Run it with a provider key for the configured tier, e.g.
|
||||||
|
//
|
||||||
|
// ANTHROPIC_API_KEY=sk-... go run ./examples/minimal
|
||||||
|
//
|
||||||
|
// Override a tier from the environment without touching code, e.g.
|
||||||
|
//
|
||||||
|
// EXECUTUS_MODEL_TIER_FAST=openai/gpt-4o-mini ANTHROPIC_API_KEY= OPENAI_KEY=sk-... go run ./examples/minimal
|
||||||
package main
|
package main
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"fmt"
|
"fmt"
|
||||||
|
"log"
|
||||||
|
|
||||||
"gitea.stevedudenhoeffer.com/steve/executus/config"
|
"gitea.stevedudenhoeffer.com/steve/executus/config"
|
||||||
"gitea.stevedudenhoeffer.com/steve/executus/fanout"
|
"gitea.stevedudenhoeffer.com/steve/executus/model"
|
||||||
|
"gitea.stevedudenhoeffer.com/steve/executus/run"
|
||||||
|
"gitea.stevedudenhoeffer.com/steve/executus/tool"
|
||||||
)
|
)
|
||||||
|
|
||||||
func main() {
|
func main() {
|
||||||
cfg := config.Env("EXECUTUS_") // e.g. EXECUTUS_FANOUT_MAX_CONCURRENT=8
|
// 1. Configure model tiers: live values come from the environment
|
||||||
max := cfg.Int("fanout.max_concurrent", 4)
|
// (EXECUTUS_MODEL_TIER_<NAME>), falling back to these defaults.
|
||||||
|
model.Configure(config.Env("EXECUTUS_"), map[string]string{
|
||||||
|
"fast": "anthropic/claude-haiku-4-5",
|
||||||
|
"thinking": "anthropic/claude-opus-4-8",
|
||||||
|
}, 0)
|
||||||
|
|
||||||
items := []string{"alpha", "beta", "gamma", "delta"}
|
// 2. Build the executor: a tool registry + the model resolver. No batteries.
|
||||||
results := fanout.Run(context.Background(), items,
|
ex := run.New(run.Config{
|
||||||
fanout.Options[string]{MaxConcurrent: max},
|
Registry: tool.NewRegistry(),
|
||||||
func(_ context.Context, s string) (int, error) { return len(s), nil })
|
Models: model.ParseModelForContext,
|
||||||
|
})
|
||||||
|
|
||||||
for _, r := range results {
|
// 3. Run an agent and print its answer.
|
||||||
fmt.Printf("%-6s -> %d (err=%v)\n", items[r.Index], r.Value, r.Err)
|
res := ex.Run(context.Background(),
|
||||||
|
run.RunnableAgent{Name: "assistant", SystemPrompt: "You are concise.", ModelTier: "fast"},
|
||||||
|
tool.Invocation{RunID: "demo-1", CallerID: "local"},
|
||||||
|
"In one sentence, what is an agent harness?")
|
||||||
|
if res.Err != nil {
|
||||||
|
log.Fatalf("run failed: %v", res.Err)
|
||||||
}
|
}
|
||||||
|
fmt.Println(res.Output)
|
||||||
}
|
}
|
||||||
|
|||||||
+4
-1
@@ -34,6 +34,7 @@ import (
|
|||||||
"context"
|
"context"
|
||||||
"encoding/json"
|
"encoding/json"
|
||||||
"fmt"
|
"fmt"
|
||||||
|
"log/slog"
|
||||||
"strings"
|
"strings"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
@@ -574,7 +575,9 @@ func (h *Helper) recordLedger(ctx context.Context, call MetaCall) {
|
|||||||
if h.storage == nil {
|
if h.storage == nil {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
_ = h.storage.RecordMetaCall(ctx, call)
|
if err := h.storage.RecordMetaCall(ctx, call); err != nil {
|
||||||
|
slog.Warn("llmmeta: failed to record ledger row", "err", err)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// tryParseJSON attempts to decode text as JSON. Returns the parsed
|
// tryParseJSON attempts to decode text as JSON. Returns the parsed
|
||||||
|
|||||||
+1
-1
@@ -237,7 +237,7 @@ func recordUsage(ctx context.Context, resp *llm.Response) {
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
u := resp.Usage
|
u := resp.Usage
|
||||||
if u.InputTokens == 0 && u.OutputTokens == 0 {
|
if u.InputTokens == 0 && u.OutputTokens == 0 && u.CacheReadTokens == 0 && u.CacheWriteTokens == 0 {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
model := resolvedModelName(ctx, resp)
|
model := resolvedModelName(ctx, resp)
|
||||||
|
|||||||
+23
-2
@@ -314,7 +314,7 @@ func (c *CloudOllamaLimitCache) fetchTags(ctx context.Context) ([]string, error)
|
|||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
defer resp.Body.Close()
|
defer resp.Body.Close()
|
||||||
body, err := io.ReadAll(resp.Body)
|
body, err := readCapped(resp.Body)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
@@ -367,7 +367,7 @@ func (c *CloudOllamaLimitCache) fetchContextLength(ctx context.Context, modelNam
|
|||||||
return 0, err
|
return 0, err
|
||||||
}
|
}
|
||||||
defer resp.Body.Close()
|
defer resp.Body.Close()
|
||||||
respBody, err := io.ReadAll(resp.Body)
|
respBody, err := readCapped(resp.Body)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return 0, err
|
return 0, err
|
||||||
}
|
}
|
||||||
@@ -451,3 +451,24 @@ func truncate(b []byte, n int) string {
|
|||||||
}
|
}
|
||||||
return string(b[:n]) + "...(truncated)"
|
return string(b[:n]) + "...(truncated)"
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// maxLimitCacheResponseBytes bounds the ollama.com limit-cache HTTP responses
|
||||||
|
// (/api/tags, /api/show) so a misbehaving endpoint can't stream an unbounded
|
||||||
|
// body before the 15s timeout fires. 1 MiB is far above any real response.
|
||||||
|
const maxLimitCacheResponseBytes = 1 << 20
|
||||||
|
|
||||||
|
// readCapped reads up to maxLimitCacheResponseBytes from r and returns a clear
|
||||||
|
// error if the response EXCEEDS the cap — rather than silently truncating (as a
|
||||||
|
// bare io.LimitReader does) and letting downstream json.Unmarshal fail with an
|
||||||
|
// opaque "unexpected end of JSON input". It reads one extra byte to detect the
|
||||||
|
// overflow.
|
||||||
|
func readCapped(r io.Reader) ([]byte, error) {
|
||||||
|
body, err := io.ReadAll(io.LimitReader(r, maxLimitCacheResponseBytes+1))
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
if len(body) > maxLimitCacheResponseBytes {
|
||||||
|
return nil, fmt.Errorf("cloud_sync: response exceeded %d bytes", maxLimitCacheResponseBytes)
|
||||||
|
}
|
||||||
|
return body, nil
|
||||||
|
}
|
||||||
|
|||||||
@@ -16,6 +16,11 @@ import (
|
|||||||
// UsageSink receives one record per successful Generate through a model parsed
|
// UsageSink receives one record per successful Generate through a model parsed
|
||||||
// by this package (ParseModelRequest / ParseModelForContext). Implement it to
|
// by this package (ParseModelRequest / ParseModelForContext). Implement it to
|
||||||
// meter or bill; the token detail mirrors majordomo's Response.Usage.
|
// meter or bill; the token detail mirrors majordomo's Response.Usage.
|
||||||
|
//
|
||||||
|
// IMPORTANT: cacheReadTokens and cacheWriteTokens are PORTIONS of inputTokens,
|
||||||
|
// not independent additive values (they let a sink price cached vs fresh input
|
||||||
|
// differently). A sink must NOT compute total = input+output+cacheRead+
|
||||||
|
// cacheWrite — that double-counts the cached input.
|
||||||
type UsageSink interface {
|
type UsageSink interface {
|
||||||
Record(ctx context.Context, model string, inputTokens, outputTokens, cacheReadTokens, cacheWriteTokens int)
|
Record(ctx context.Context, model string, inputTokens, outputTokens, cacheReadTokens, cacheWriteTokens int)
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -0,0 +1,76 @@
|
|||||||
|
package run
|
||||||
|
|
||||||
|
import "time"
|
||||||
|
|
||||||
|
// RunnableAgent is the kernel's view of "a thing to run": an identity, a model
|
||||||
|
// tier, a system prompt, execution caps, and a tool palette. It is a plain DTO
|
||||||
|
// on purpose — the run kernel never imports a noun battery. The persona Agent
|
||||||
|
// and the saved Skill each LOWER themselves into a RunnableAgent (a ToRunnable
|
||||||
|
// method on the battery side), and the kernel runs the DTO. This is the
|
||||||
|
// inversion of mort's agentexec.Executor.Run(*agents.Agent): the executor no
|
||||||
|
// longer depends on the persona struct, only on this shape.
|
||||||
|
//
|
||||||
|
// A light host can build a RunnableAgent inline (model tier + prompt + a few
|
||||||
|
// tool names) for a one-shot bounded run, with no persona or skill battery at
|
||||||
|
// all — that is exactly gadfly's swarm task.
|
||||||
|
type RunnableAgent struct {
|
||||||
|
// ID is a stable identifier for the run subject (an agent/skill UUID, or
|
||||||
|
// any host-chosen id). Used for audit attribution and dispatch-guard
|
||||||
|
// genealogy. Empty is allowed for anonymous one-shot runs.
|
||||||
|
ID string
|
||||||
|
|
||||||
|
// Name is a human label (audit/logs/delivery). Empty is allowed.
|
||||||
|
Name string
|
||||||
|
|
||||||
|
// SystemPrompt is the agent's base system prompt (before per-run
|
||||||
|
// personalization, which a host layers via Ports).
|
||||||
|
SystemPrompt string
|
||||||
|
|
||||||
|
// ModelTier is a tier alias or concrete spec resolved through
|
||||||
|
// model.ParseModelForContext. Empty resolves to the host's default tier.
|
||||||
|
ModelTier string
|
||||||
|
|
||||||
|
// MaxIterations caps the agent loop's tool-dispatch steps. 0 = kernel
|
||||||
|
// default. MaxRuntime caps wall-clock for the whole run (the kernel starts
|
||||||
|
// this clock AFTER any lane dequeue, not at submission). 0 = kernel
|
||||||
|
// default.
|
||||||
|
MaxIterations int
|
||||||
|
MaxRuntime time.Duration
|
||||||
|
|
||||||
|
// LowLevelTools are tool-registry names the run may call directly.
|
||||||
|
// SkillPalette / SubAgentPalette name saved skills / sub-agents exposed as
|
||||||
|
// skill__<name> / agent__<name> delegation tools, resolved through
|
||||||
|
// Ports.Palette (nil Palette => those entries are inert).
|
||||||
|
LowLevelTools []string
|
||||||
|
SkillPalette []string
|
||||||
|
SubAgentPalette []string
|
||||||
|
|
||||||
|
// Phases optionally model a multi-step pipeline (each phase its own prompt
|
||||||
|
// + tier + tools). An empty slice is a single-phase run — the common case.
|
||||||
|
Phases []Phase
|
||||||
|
|
||||||
|
// Critic configures the optional two-tier run-critic (Ports.Critic). The
|
||||||
|
// zero value (disabled) is the light-host default.
|
||||||
|
Critic CriticConfig
|
||||||
|
}
|
||||||
|
|
||||||
|
// Phase is one step of a multi-step run: its own system prompt, model tier,
|
||||||
|
// iteration cap, and tool subset. Optional phases may be skipped by the
|
||||||
|
// pipeline when their precondition isn't met.
|
||||||
|
type Phase struct {
|
||||||
|
Name string
|
||||||
|
SystemPrompt string
|
||||||
|
ModelTier string
|
||||||
|
MaxIterations int
|
||||||
|
Tools []string
|
||||||
|
Optional bool
|
||||||
|
}
|
||||||
|
|
||||||
|
// CriticConfig configures the optional run-critic. Enabled gates whether a
|
||||||
|
// critic monitor is started at all; BackstopMultiplier sets the hard-kill
|
||||||
|
// deadline as a multiple of the soft trigger (MaxRuntime). A non-positive
|
||||||
|
// multiplier uses the kernel default.
|
||||||
|
type CriticConfig struct {
|
||||||
|
Enabled bool
|
||||||
|
BackstopMultiplier float64
|
||||||
|
}
|
||||||
+318
@@ -0,0 +1,318 @@
|
|||||||
|
package run
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"errors"
|
||||||
|
"fmt"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"gitea.stevedudenhoeffer.com/steve/majordomo/agent"
|
||||||
|
"gitea.stevedudenhoeffer.com/steve/majordomo/llm"
|
||||||
|
|
||||||
|
"gitea.stevedudenhoeffer.com/steve/executus/compact"
|
||||||
|
"gitea.stevedudenhoeffer.com/steve/executus/tool"
|
||||||
|
)
|
||||||
|
|
||||||
|
// ModelResolver resolves a tier alias or concrete spec to a usable llm.Model
|
||||||
|
// and an enriched context (for usage attribution). model.ParseModelForContext
|
||||||
|
// satisfies it.
|
||||||
|
type ModelResolver func(ctx context.Context, tier string) (context.Context, llm.Model, error)
|
||||||
|
|
||||||
|
// Defaults are the executor's fallback caps and loop guards, applied per run
|
||||||
|
// when the RunnableAgent leaves a field zero.
|
||||||
|
type Defaults struct {
|
||||||
|
MaxIterations int // tool-dispatch steps; default 12
|
||||||
|
MaxRuntime time.Duration // wall-clock per run; default 60s
|
||||||
|
FallbackTier string // tier when the agent's is empty; default "fast"
|
||||||
|
MaxConsecutiveToolErrors int // loop guard; default 3
|
||||||
|
MaxSameToolCallRepeats int // retry-storm guard; default 3
|
||||||
|
CompactionThresholdRatio float64 // fraction of model context to compact at; default 0.7
|
||||||
|
}
|
||||||
|
|
||||||
|
func (d Defaults) withFallbacks() Defaults {
|
||||||
|
if d.MaxIterations <= 0 {
|
||||||
|
d.MaxIterations = 12
|
||||||
|
}
|
||||||
|
if d.MaxRuntime <= 0 {
|
||||||
|
d.MaxRuntime = 60 * time.Second
|
||||||
|
}
|
||||||
|
if d.FallbackTier == "" {
|
||||||
|
d.FallbackTier = "fast"
|
||||||
|
}
|
||||||
|
if d.MaxConsecutiveToolErrors <= 0 {
|
||||||
|
d.MaxConsecutiveToolErrors = 3
|
||||||
|
}
|
||||||
|
if d.MaxSameToolCallRepeats <= 0 {
|
||||||
|
d.MaxSameToolCallRepeats = 3
|
||||||
|
}
|
||||||
|
if d.CompactionThresholdRatio <= 0 {
|
||||||
|
d.CompactionThresholdRatio = 0.7
|
||||||
|
}
|
||||||
|
return d
|
||||||
|
}
|
||||||
|
|
||||||
|
// Config wires an Executor. Registry + Models are required; everything else is
|
||||||
|
// optional and nil-safe — the zero Config beyond those yields a bounded,
|
||||||
|
// in-memory run with no persistence/audit/budget/critic/delegation/compaction
|
||||||
|
// (gadfly's case).
|
||||||
|
type Config struct {
|
||||||
|
Registry tool.Registry
|
||||||
|
Models ModelResolver
|
||||||
|
Defaults Defaults
|
||||||
|
Ports Ports
|
||||||
|
|
||||||
|
// Compactor mints the per-run context-compaction hook. nil disables
|
||||||
|
// compaction. ContextTokens resolves a tier's model context-window (for
|
||||||
|
// the compaction threshold); nil — or a zero return — also disables it.
|
||||||
|
Compactor compact.CompactorFactory
|
||||||
|
ContextTokens func(tier string) int
|
||||||
|
|
||||||
|
// SystemHeader is an optional platform header prepended to every agent's
|
||||||
|
// system prompt.
|
||||||
|
SystemHeader string
|
||||||
|
}
|
||||||
|
|
||||||
|
// Executor runs a RunnableAgent through majordomo's agent loop with the wired
|
||||||
|
// Ports. Construct with New; safe for concurrent use across runs.
|
||||||
|
type Executor struct {
|
||||||
|
cfg Config
|
||||||
|
}
|
||||||
|
|
||||||
|
// New builds an Executor. It panics if Registry or Models is nil — those are
|
||||||
|
// structural, not runtime, errors.
|
||||||
|
func New(cfg Config) *Executor {
|
||||||
|
if cfg.Registry == nil || cfg.Models == nil {
|
||||||
|
panic("run.New: Registry and Models are required")
|
||||||
|
}
|
||||||
|
cfg.Defaults = cfg.Defaults.withFallbacks()
|
||||||
|
return &Executor{cfg: cfg}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Result is one run's outcome. Err carries the run failure (if any); the other
|
||||||
|
// fields are populated best-effort even on error (partial output/steps/usage).
|
||||||
|
type Result struct {
|
||||||
|
RunID string
|
||||||
|
Output string
|
||||||
|
Steps []tool.Step
|
||||||
|
Usage llm.Usage
|
||||||
|
Err error
|
||||||
|
}
|
||||||
|
|
||||||
|
// Run executes ra with the given invocation + input and returns the Result. It
|
||||||
|
// never propagates a panic; failures surface in Result.Err.
|
||||||
|
func (e *Executor) Run(ctx context.Context, ra RunnableAgent, inv tool.Invocation, input string) Result {
|
||||||
|
started := time.Now()
|
||||||
|
res := Result{RunID: inv.RunID}
|
||||||
|
|
||||||
|
tier := ra.ModelTier
|
||||||
|
if tier == "" {
|
||||||
|
tier = e.cfg.Defaults.FallbackTier
|
||||||
|
}
|
||||||
|
maxIter := ra.MaxIterations
|
||||||
|
if maxIter <= 0 {
|
||||||
|
maxIter = e.cfg.Defaults.MaxIterations
|
||||||
|
}
|
||||||
|
maxRuntime := ra.MaxRuntime
|
||||||
|
if maxRuntime <= 0 {
|
||||||
|
maxRuntime = e.cfg.Defaults.MaxRuntime
|
||||||
|
}
|
||||||
|
|
||||||
|
// Budget gate (pre-run): a rejected run makes no model call.
|
||||||
|
if e.cfg.Ports.Budget != nil {
|
||||||
|
if err := e.cfg.Ports.Budget.Check(ctx, inv.CallerID); err != nil {
|
||||||
|
res.Err = err
|
||||||
|
return res
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Resolve the model (enriches ctx for usage attribution).
|
||||||
|
modelCtx, model, err := e.cfg.Models(ctx, tier)
|
||||||
|
if err != nil {
|
||||||
|
res.Err = fmt.Errorf("resolve model %q: %w", tier, err)
|
||||||
|
return res
|
||||||
|
}
|
||||||
|
if model == nil {
|
||||||
|
// A resolver returning (ctx, nil, nil) would otherwise nil-panic inside
|
||||||
|
// the agent loop; surface it as a clean error (Run never panics out).
|
||||||
|
res.Err = fmt.Errorf("resolve model %q: resolver returned a nil model", tier)
|
||||||
|
return res
|
||||||
|
}
|
||||||
|
ctx = modelCtx
|
||||||
|
|
||||||
|
// Audit start (optional). The recorder satisfies RunTally; stamp it on the
|
||||||
|
// invocation so a self-status tool can read live progress.
|
||||||
|
var rec RunRecorder
|
||||||
|
var stateAcc *RunStateAccessor
|
||||||
|
if e.cfg.Ports.Audit != nil {
|
||||||
|
rec = e.cfg.Ports.Audit.StartRun(ctx, RunInfo{
|
||||||
|
RunID: inv.RunID,
|
||||||
|
SubjectID: ra.ID,
|
||||||
|
Name: ra.Name,
|
||||||
|
CallerID: inv.CallerID,
|
||||||
|
ChannelID: inv.ChannelID,
|
||||||
|
ParentRunID: inv.ParentRunID,
|
||||||
|
Inputs: inv.SkillInputs,
|
||||||
|
StartedAt: started,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
if rec != nil {
|
||||||
|
stateAcc = NewRunStateAccessor(rec, maxIter, 0, started)
|
||||||
|
inv.RunState = stateAcc
|
||||||
|
}
|
||||||
|
|
||||||
|
// Build the toolbox from the agent's low-level tools.
|
||||||
|
toolbox, err := e.cfg.Registry.Build(ra.LowLevelTools, inv, tool.Visibility("private"), nil)
|
||||||
|
if err != nil {
|
||||||
|
res.Err = fmt.Errorf("build toolbox: %w", err)
|
||||||
|
e.finishAudit(ctx, rec, "error", res, started, res.Err)
|
||||||
|
return res
|
||||||
|
}
|
||||||
|
|
||||||
|
// Run context: bound by MaxRuntime, detached from the caller's deadline so a
|
||||||
|
// lane/queue wait doesn't eat the run budget (mort's V10 lesson). Caller
|
||||||
|
// cancellation still propagates via MergeCancellation. Created BEFORE the
|
||||||
|
// step observer so the observer forwards the merged run context (not a
|
||||||
|
// possibly-cancelled caller ctx) to OnStep consumers.
|
||||||
|
runCtx, cancel := context.WithTimeout(context.WithoutCancel(ctx), maxRuntime)
|
||||||
|
defer cancel()
|
||||||
|
runCtx, mergeCancel := MergeCancellation(runCtx, ctx)
|
||||||
|
defer mergeCancel()
|
||||||
|
|
||||||
|
// Step instrumentation: accumulate Result.Steps + fire inv.OnStep, feed the
|
||||||
|
// audit recorder, and keep the live iteration counter fresh. majordomo's
|
||||||
|
// step observer hands us each completed iteration; we zip the model's tool
|
||||||
|
// calls with their executed results PAIRWISE — a result without a matching
|
||||||
|
// call (or a call without a result) is skipped rather than recorded as an
|
||||||
|
// empty-name "ghost" step.
|
||||||
|
emitter := newStepEmitter(inv.OnStep)
|
||||||
|
stepObserver := func(s agent.Step) {
|
||||||
|
if stateAcc != nil {
|
||||||
|
stateAcc.SetIteration(s.Index)
|
||||||
|
}
|
||||||
|
if rec != nil {
|
||||||
|
rec.OnStep(s.Index, s.Response)
|
||||||
|
}
|
||||||
|
var calls []llm.ToolCall
|
||||||
|
if s.Response != nil {
|
||||||
|
calls = s.Response.ToolCalls
|
||||||
|
}
|
||||||
|
n := len(s.Results)
|
||||||
|
if len(calls) < n {
|
||||||
|
n = len(calls)
|
||||||
|
}
|
||||||
|
for i := 0; i < n; i++ {
|
||||||
|
call, r := calls[i], s.Results[i]
|
||||||
|
emitter.toolStart(runCtx, call.Name, call.Arguments)
|
||||||
|
emitter.toolEnd(runCtx, call, r.Content, r.IsError)
|
||||||
|
if rec != nil {
|
||||||
|
rec.OnTool(call, r.Content)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
opts := []agent.Option{
|
||||||
|
agent.WithToolbox(toolbox),
|
||||||
|
agent.WithMaxSteps(maxIter),
|
||||||
|
agent.WithToolErrorLimits(e.cfg.Defaults.MaxConsecutiveToolErrors, e.cfg.Defaults.MaxSameToolCallRepeats),
|
||||||
|
agent.WithStepObserver(stepObserver),
|
||||||
|
}
|
||||||
|
if e.cfg.Compactor != nil && e.cfg.ContextTokens != nil {
|
||||||
|
if threshold := e.compactionThreshold(tier); threshold > 0 {
|
||||||
|
// Forward compaction events to the audit log (makes the
|
||||||
|
// CompactionEvent doc's "logged to the run trace" promise true).
|
||||||
|
var onFire func(compact.CompactionEvent)
|
||||||
|
if rec != nil {
|
||||||
|
onFire = func(ev compact.CompactionEvent) {
|
||||||
|
rec.LogEvent("compaction_fired", map[string]any{
|
||||||
|
"messages_before": ev.MessagesBefore,
|
||||||
|
"messages_after": ev.MessagesAfter,
|
||||||
|
"tokens_before": ev.TokensBefore,
|
||||||
|
"tokens_after": ev.TokensAfter,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
opts = append(opts, agent.WithCompactor(e.cfg.Compactor(threshold, onFire)))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
ag := agent.New(model, e.systemPrompt(ra), opts...)
|
||||||
|
runRes, runErr := ag.Run(runCtx, input)
|
||||||
|
|
||||||
|
status := statusFor(runErr)
|
||||||
|
if runRes != nil {
|
||||||
|
res.Output = runRes.Output
|
||||||
|
res.Usage = runRes.Usage
|
||||||
|
}
|
||||||
|
res.Steps = emitter.snapshot()
|
||||||
|
res.Err = runErr
|
||||||
|
|
||||||
|
e.finishAudit(ctx, rec, status, res, started, runErr)
|
||||||
|
if e.cfg.Ports.Budget != nil {
|
||||||
|
e.cfg.Ports.Budget.Commit(detach(ctx), inv.CallerID, time.Since(started).Seconds())
|
||||||
|
}
|
||||||
|
return res
|
||||||
|
}
|
||||||
|
|
||||||
|
// statusFor maps a run error to a RunStats.Status, distinguishing a deadline
|
||||||
|
// (timeout) and a cancellation (cancelled — caller cancel or shutdown) from a
|
||||||
|
// generic error so audit consumers can tell them apart.
|
||||||
|
func statusFor(runErr error) string {
|
||||||
|
switch {
|
||||||
|
case runErr == nil:
|
||||||
|
return "ok"
|
||||||
|
case errors.Is(runErr, context.DeadlineExceeded):
|
||||||
|
return "timeout"
|
||||||
|
case errors.Is(runErr, context.Canceled):
|
||||||
|
return "cancelled"
|
||||||
|
default:
|
||||||
|
return "error"
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// finishAudit writes the terminal roll-up on a detached context so a cancelled
|
||||||
|
// run still records (mort's CleanupContextTimeout lesson).
|
||||||
|
func (e *Executor) finishAudit(ctx context.Context, rec RunRecorder, status string, res Result, started time.Time, runErr error) {
|
||||||
|
if rec == nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
stats := RunStats{
|
||||||
|
Status: status,
|
||||||
|
Output: res.Output,
|
||||||
|
ToolCalls: rec.ToolCallsCount(),
|
||||||
|
RuntimeSeconds: time.Since(started).Seconds(),
|
||||||
|
}
|
||||||
|
if runErr != nil {
|
||||||
|
stats.Error = runErr.Error()
|
||||||
|
}
|
||||||
|
stats.InputTokens, stats.OutputTokens, stats.ThinkingTokens = rec.TokenStats()
|
||||||
|
rec.Close(detach(ctx), stats)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (e *Executor) systemPrompt(ra RunnableAgent) string {
|
||||||
|
if e.cfg.SystemHeader == "" {
|
||||||
|
return ra.SystemPrompt
|
||||||
|
}
|
||||||
|
if ra.SystemPrompt == "" {
|
||||||
|
return e.cfg.SystemHeader
|
||||||
|
}
|
||||||
|
return e.cfg.SystemHeader + "\n\n" + ra.SystemPrompt
|
||||||
|
}
|
||||||
|
|
||||||
|
// compactionThreshold returns the token threshold for the tier's model context
|
||||||
|
// window (ratio × limit), or 0 when the limit is unknown.
|
||||||
|
func (e *Executor) compactionThreshold(tier string) int {
|
||||||
|
max := e.cfg.ContextTokens(tier)
|
||||||
|
if max <= 0 {
|
||||||
|
return 0
|
||||||
|
}
|
||||||
|
return int(float64(max) * e.cfg.Defaults.CompactionThresholdRatio)
|
||||||
|
}
|
||||||
|
|
||||||
|
// detach derives a bounded cleanup context off ctx, detached from its
|
||||||
|
// cancellation, for post-run writes. The cancel is intentionally not returned;
|
||||||
|
// CleanupContextTimeout bounds the lifetime.
|
||||||
|
func detach(ctx context.Context) context.Context {
|
||||||
|
c, cancel := context.WithTimeout(context.WithoutCancel(ctx), CleanupContextTimeout)
|
||||||
|
_ = cancel // bounded by the timeout; nothing to cancel early
|
||||||
|
return c
|
||||||
|
}
|
||||||
@@ -0,0 +1,168 @@
|
|||||||
|
package run
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"errors"
|
||||||
|
"fmt"
|
||||||
|
"testing"
|
||||||
|
|
||||||
|
"gitea.stevedudenhoeffer.com/steve/majordomo/llm"
|
||||||
|
"gitea.stevedudenhoeffer.com/steve/majordomo/provider/fake"
|
||||||
|
|
||||||
|
"gitea.stevedudenhoeffer.com/steve/executus/tool"
|
||||||
|
)
|
||||||
|
|
||||||
|
// fakeModels returns a ModelResolver backed by a fake provider scripted to
|
||||||
|
// reply with the given text (no tool calls — the loop terminates immediately).
|
||||||
|
func fakeModels(t *testing.T, reply string) ModelResolver {
|
||||||
|
t.Helper()
|
||||||
|
fp := fake.New("fake")
|
||||||
|
fp.Enqueue("test-model", fake.Reply(reply))
|
||||||
|
m, err := fp.Model("test-model")
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("fake model: %v", err)
|
||||||
|
}
|
||||||
|
return func(ctx context.Context, _ string) (context.Context, llm.Model, error) {
|
||||||
|
return ctx, m, nil
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// TestExecutorRunHelloWorld is the milestone: executus runs an agent end-to-end
|
||||||
|
// against the fake provider and returns its output. Proves the kernel is
|
||||||
|
// runnable with the zero Ports (no persistence/audit/budget/critic).
|
||||||
|
func TestExecutorRunHelloWorld(t *testing.T) {
|
||||||
|
ex := New(Config{
|
||||||
|
Registry: tool.NewRegistry(),
|
||||||
|
Models: fakeModels(t, "hello from executus"),
|
||||||
|
})
|
||||||
|
|
||||||
|
res := ex.Run(context.Background(),
|
||||||
|
RunnableAgent{Name: "greeter", SystemPrompt: "be brief", ModelTier: "test-model"},
|
||||||
|
tool.Invocation{RunID: "run-1", CallerID: "caller-1"},
|
||||||
|
"say hi")
|
||||||
|
|
||||||
|
if res.Err != nil {
|
||||||
|
t.Fatalf("run error: %v", res.Err)
|
||||||
|
}
|
||||||
|
if res.Output != "hello from executus" {
|
||||||
|
t.Fatalf("output = %q, want %q", res.Output, "hello from executus")
|
||||||
|
}
|
||||||
|
if res.RunID != "run-1" {
|
||||||
|
t.Errorf("RunID = %q, want run-1", res.RunID)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// TestExecutorBudgetRejection: a Budget that denies makes no model call.
|
||||||
|
func TestExecutorBudgetRejection(t *testing.T) {
|
||||||
|
denied := errors.New("over budget")
|
||||||
|
var modelCalled bool
|
||||||
|
models := func(ctx context.Context, _ string) (context.Context, llm.Model, error) {
|
||||||
|
modelCalled = true
|
||||||
|
return ctx, nil, nil
|
||||||
|
}
|
||||||
|
ex := New(Config{
|
||||||
|
Registry: tool.NewRegistry(),
|
||||||
|
Models: models,
|
||||||
|
Ports: Ports{Budget: budgetFunc{check: func(string) error { return denied }}},
|
||||||
|
})
|
||||||
|
|
||||||
|
res := ex.Run(context.Background(),
|
||||||
|
RunnableAgent{ModelTier: "test-model"},
|
||||||
|
tool.Invocation{RunID: "r", CallerID: "broke"}, "hi")
|
||||||
|
|
||||||
|
if !errors.Is(res.Err, denied) {
|
||||||
|
t.Fatalf("err = %v, want budget denial", res.Err)
|
||||||
|
}
|
||||||
|
if modelCalled {
|
||||||
|
t.Error("model must not be resolved/called when budget denies")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// TestExecutorAuditWiring: the Audit port receives StartRun + Close with the
|
||||||
|
// terminal status/output.
|
||||||
|
func TestExecutorAuditWiring(t *testing.T) {
|
||||||
|
rec := &captureRecorder{}
|
||||||
|
ex := New(Config{
|
||||||
|
Registry: tool.NewRegistry(),
|
||||||
|
Models: fakeModels(t, "done"),
|
||||||
|
Ports: Ports{Audit: auditFunc{start: func(RunInfo) RunRecorder { return rec }}},
|
||||||
|
})
|
||||||
|
|
||||||
|
res := ex.Run(context.Background(),
|
||||||
|
RunnableAgent{ModelTier: "test-model"},
|
||||||
|
tool.Invocation{RunID: "r2", CallerID: "c"}, "go")
|
||||||
|
|
||||||
|
if res.Err != nil {
|
||||||
|
t.Fatalf("run error: %v", res.Err)
|
||||||
|
}
|
||||||
|
if !rec.closed {
|
||||||
|
t.Fatal("recorder.Close was not called")
|
||||||
|
}
|
||||||
|
if rec.stats.Status != "ok" {
|
||||||
|
t.Errorf("close status = %q, want ok", rec.stats.Status)
|
||||||
|
}
|
||||||
|
if rec.stats.Output != "done" {
|
||||||
|
t.Errorf("close output = %q, want done", rec.stats.Output)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// --- test doubles ---
|
||||||
|
|
||||||
|
type budgetFunc struct{ check func(callerID string) error }
|
||||||
|
|
||||||
|
func (b budgetFunc) Check(_ context.Context, callerID string) error { return b.check(callerID) }
|
||||||
|
func (b budgetFunc) Commit(context.Context, string, float64) {}
|
||||||
|
|
||||||
|
type auditFunc struct{ start func(RunInfo) RunRecorder }
|
||||||
|
|
||||||
|
func (a auditFunc) StartRun(_ context.Context, info RunInfo) RunRecorder { return a.start(info) }
|
||||||
|
|
||||||
|
type captureRecorder struct {
|
||||||
|
closed bool
|
||||||
|
stats RunStats
|
||||||
|
steps int
|
||||||
|
tools int
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *captureRecorder) TokenStats() (in, out, thinking int64) { return 0, 0, 0 }
|
||||||
|
func (r *captureRecorder) ToolCallsCount() int { return r.tools }
|
||||||
|
func (r *captureRecorder) OnStep(int, *llm.Response) { r.steps++ }
|
||||||
|
func (r *captureRecorder) OnTool(llm.ToolCall, string) { r.tools++ }
|
||||||
|
func (r *captureRecorder) LogEvent(string, map[string]any) {}
|
||||||
|
func (r *captureRecorder) LogError(string) {}
|
||||||
|
func (r *captureRecorder) Close(_ context.Context, s RunStats) { r.closed = true; r.stats = s }
|
||||||
|
|
||||||
|
// TestExecutorNilModelNoPanic: a resolver returning (ctx, nil, nil) yields a
|
||||||
|
// clean error, not a nil-pointer panic (gadfly F1, high severity).
|
||||||
|
func TestExecutorNilModelNoPanic(t *testing.T) {
|
||||||
|
ex := New(Config{
|
||||||
|
Registry: tool.NewRegistry(),
|
||||||
|
Models: func(ctx context.Context, _ string) (context.Context, llm.Model, error) {
|
||||||
|
return ctx, nil, nil // nil model, nil error
|
||||||
|
},
|
||||||
|
})
|
||||||
|
res := ex.Run(context.Background(),
|
||||||
|
RunnableAgent{ModelTier: "x"}, tool.Invocation{RunID: "r"}, "hi")
|
||||||
|
if res.Err == nil {
|
||||||
|
t.Fatal("expected an error for a nil model, got nil (would have panicked in the loop)")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// TestStatusFor maps run errors to RunStats.Status (gadfly F3).
|
||||||
|
func TestStatusFor(t *testing.T) {
|
||||||
|
cases := []struct {
|
||||||
|
err error
|
||||||
|
want string
|
||||||
|
}{
|
||||||
|
{nil, "ok"},
|
||||||
|
{context.DeadlineExceeded, "timeout"},
|
||||||
|
{context.Canceled, "cancelled"},
|
||||||
|
{fmt.Errorf("wrapped: %w", context.DeadlineExceeded), "timeout"},
|
||||||
|
{errors.New("boom"), "error"},
|
||||||
|
}
|
||||||
|
for _, c := range cases {
|
||||||
|
if got := statusFor(c.err); got != c.want {
|
||||||
|
t.Errorf("statusFor(%v) = %q, want %q", c.err, got, c.want)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
+168
@@ -0,0 +1,168 @@
|
|||||||
|
package run
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"gitea.stevedudenhoeffer.com/steve/majordomo/llm"
|
||||||
|
|
||||||
|
"gitea.stevedudenhoeffer.com/steve/executus/deliver"
|
||||||
|
)
|
||||||
|
|
||||||
|
// Ports are the host seams the run executor consumes. Every field is nil-safe:
|
||||||
|
// a light host passes the zero Ports and gets a bounded, in-memory run with no
|
||||||
|
// persistence, audit, budget, critic, delegation, or delivery — which is
|
||||||
|
// exactly a gadfly swarm task. A heavy host (mort) wires each one to a battery.
|
||||||
|
//
|
||||||
|
// This struct IS the inversion: in mort, agentexec imports agents /
|
||||||
|
// agentcritic / skillaudit and skillexec imports skills / paste directly; here
|
||||||
|
// the kernel depends only on these interfaces, and the batteries implement
|
||||||
|
// them. The mort_*_adapters.go wall becomes the set of impls.
|
||||||
|
type Ports struct {
|
||||||
|
// Audit records the run trace (start, per-step/per-tool events, final
|
||||||
|
// stats). nil = no audit.
|
||||||
|
Audit Audit
|
||||||
|
// Budget gates and meters per-caller resource use. nil = unbounded.
|
||||||
|
Budget Budget
|
||||||
|
// Critic optionally monitors a long run for hangs/runaways. nil = none.
|
||||||
|
Critic Critic
|
||||||
|
// Checkpointer persists resumable progress for durable recovery. nil = no
|
||||||
|
// checkpointing (a run interrupted by shutdown is simply lost).
|
||||||
|
Checkpointer Checkpointer
|
||||||
|
// Palette resolves SkillPalette / SubAgentPalette entries into delegation
|
||||||
|
// tools (skill__<name> / agent__<name>). nil = those entries are inert.
|
||||||
|
Palette PaletteSource
|
||||||
|
// Delivery is where the run's output + artifacts go. nil = the caller
|
||||||
|
// reads the Result in-process (the light-host default).
|
||||||
|
Delivery deliver.Delivery
|
||||||
|
}
|
||||||
|
|
||||||
|
// RunInfo describes a run at start time — the attribution a recorder/critic
|
||||||
|
// needs. Host-neutral rename of mort's SkillRun start fields.
|
||||||
|
type RunInfo struct {
|
||||||
|
RunID string
|
||||||
|
SubjectID string // the agent/skill id being run (audit "skill_id")
|
||||||
|
Name string
|
||||||
|
CallerID string
|
||||||
|
ChannelID string
|
||||||
|
ParentRunID string
|
||||||
|
Inputs map[string]any
|
||||||
|
StartedAt time.Time
|
||||||
|
}
|
||||||
|
|
||||||
|
// RunStats is the terminal roll-up a recorder's Close writes. Mirrors mort's
|
||||||
|
// skillaudit/skillexec RunStats.
|
||||||
|
type RunStats struct {
|
||||||
|
Status string // ok | error | timeout | budget_exceeded | cancelled | dry_run
|
||||||
|
Output string
|
||||||
|
Error string
|
||||||
|
ToolCalls int
|
||||||
|
RuntimeSeconds float64
|
||||||
|
InputTokens int64
|
||||||
|
OutputTokens int64
|
||||||
|
ThinkingTokens int64
|
||||||
|
}
|
||||||
|
|
||||||
|
// --- Audit ---
|
||||||
|
|
||||||
|
// Audit begins recording a run. StartRun returns a per-run RunRecorder (or nil
|
||||||
|
// to skip recording this run). The audit battery wires its Storage behind this.
|
||||||
|
type Audit interface {
|
||||||
|
StartRun(ctx context.Context, info RunInfo) RunRecorder
|
||||||
|
}
|
||||||
|
|
||||||
|
// RunRecorder records the events of one in-flight run and its final stats. It
|
||||||
|
// satisfies RunTally so the kernel can surface live token/tool counts to the
|
||||||
|
// self-status tool. Mirrors mort's skillaudit.Writer.
|
||||||
|
type RunRecorder interface {
|
||||||
|
RunTally
|
||||||
|
// OnStep records one completed agent-loop iteration's model response.
|
||||||
|
OnStep(iter int, resp *llm.Response)
|
||||||
|
// OnTool records one executed tool call + its result.
|
||||||
|
OnTool(call llm.ToolCall, result string)
|
||||||
|
// LogEvent / LogError append structured events to the run log.
|
||||||
|
LogEvent(eventType string, payload map[string]any)
|
||||||
|
LogError(msg string)
|
||||||
|
// Close writes the terminal roll-up. Detaches from the caller's context
|
||||||
|
// internally so a cancelled run still records.
|
||||||
|
Close(ctx context.Context, stats RunStats)
|
||||||
|
}
|
||||||
|
|
||||||
|
// --- Budget ---
|
||||||
|
|
||||||
|
// Budget gates and meters per-caller resource use. Mirrors mort's
|
||||||
|
// skillexec.BudgetTracker.
|
||||||
|
type Budget interface {
|
||||||
|
// Check reports whether the caller has remaining budget (nil = allowed).
|
||||||
|
Check(ctx context.Context, callerID string) error
|
||||||
|
// Commit records that the caller spent runtimeSeconds on this run.
|
||||||
|
Commit(ctx context.Context, callerID string, runtimeSeconds float64)
|
||||||
|
}
|
||||||
|
|
||||||
|
// --- Critic ---
|
||||||
|
|
||||||
|
// Critic optionally monitors a long-running run (the two-tier soft/hard
|
||||||
|
// timeout). Monitor returns a handle the executor feeds progress into and
|
||||||
|
// queries for steer/deadline decisions; a nil handle means "not monitored".
|
||||||
|
//
|
||||||
|
// The exact wiring (how the handle's Steer/Deadline bind into majordomo's
|
||||||
|
// agent.WithSteer / agent.WithMaxStepsFunc / run-context cancellation) is
|
||||||
|
// finalized in the executor; this is the seam the agentcritic battery adapts.
|
||||||
|
type Critic interface {
|
||||||
|
Monitor(ctx context.Context, info RunInfo, softTimeout time.Duration) CriticHandle
|
||||||
|
}
|
||||||
|
|
||||||
|
// CriticHandle is the executor's live link to a run's critic.
|
||||||
|
type CriticHandle interface {
|
||||||
|
// RecordStep / RecordToolStart keep the critic's activity clock fresh so a
|
||||||
|
// healthy-but-slow run is not mistaken for a hang.
|
||||||
|
RecordStep(iter int)
|
||||||
|
RecordToolStart(name, args string)
|
||||||
|
// Steer returns any messages the critic wants injected into the loop (a
|
||||||
|
// nudge), drained before each step — matches majordomo agent.WithSteer.
|
||||||
|
Steer() []llm.Message
|
||||||
|
// Deadline returns the current hard-kill deadline (the critic may extend
|
||||||
|
// it); the executor binds the run context to it. Zero = no hard deadline.
|
||||||
|
Deadline() time.Time
|
||||||
|
// Stop ends monitoring when the run finishes.
|
||||||
|
Stop()
|
||||||
|
}
|
||||||
|
|
||||||
|
// --- Checkpointer ---
|
||||||
|
|
||||||
|
// Checkpointer persists a run's resumable progress for durable recovery.
|
||||||
|
// Mirrors mort's agentexec.RunCheckpointer.
|
||||||
|
type Checkpointer interface {
|
||||||
|
// Save persists the run's current resumable progress (throttled).
|
||||||
|
Save(ctx context.Context, st RunCheckpointState) error
|
||||||
|
// Complete clears the checkpoint on success.
|
||||||
|
Complete(ctx context.Context) error
|
||||||
|
// Fail clears the checkpoint on terminal failure. A run interrupted by
|
||||||
|
// shutdown is left untouched so boot recovery picks it up.
|
||||||
|
Fail(ctx context.Context, err error) error
|
||||||
|
}
|
||||||
|
|
||||||
|
// RunCheckpointState is the resumable snapshot a Checkpointer persists. Kept
|
||||||
|
// minimal here; the executor extends what it records during the merge.
|
||||||
|
type RunCheckpointState struct {
|
||||||
|
Messages []llm.Message
|
||||||
|
Iteration int
|
||||||
|
}
|
||||||
|
|
||||||
|
// --- PaletteSource ---
|
||||||
|
|
||||||
|
// PaletteSource resolves a RunnableAgent's SkillPalette / SubAgentPalette names
|
||||||
|
// into delegation tools and invokes them. Mirrors mort's
|
||||||
|
// SkillInvokerForPalette + AgentInvokerForPalette. nil Palette => palette
|
||||||
|
// entries are inert ("not configured" at first call).
|
||||||
|
type PaletteSource interface {
|
||||||
|
ResolveSkill(ctx context.Context, callerID, name string) (skillID string, err error)
|
||||||
|
InvokeSkill(ctx context.Context, callerID, channelID, name string,
|
||||||
|
inputs map[string]any, parentRunID string) (output, runID, status string, err error)
|
||||||
|
|
||||||
|
ResolveAgent(ctx context.Context, callerID, name string) (agentID string, err error)
|
||||||
|
InvokeAgent(ctx context.Context, callerID, channelID, name string,
|
||||||
|
prompt, parentRunID, modelTierOverride, promptPrepend string,
|
||||||
|
toolsSubset []string,
|
||||||
|
onEvent func(ctx context.Context, event, emoji string)) (output, runID, status string, err error)
|
||||||
|
}
|
||||||
@@ -0,0 +1,157 @@
|
|||||||
|
// Package run is executus's run kernel: the shared run-loop mechanics around
|
||||||
|
// majordomo's agent loop, plus the host seams (run.Ports / RunnableAgent) that
|
||||||
|
// let one executor serve every surface — a light host's bounded one-shot run,
|
||||||
|
// a heavy host's persona agent or saved skill — without the kernel importing a
|
||||||
|
// battery.
|
||||||
|
//
|
||||||
|
// This file holds the genuinely-identical scaffolding both run shapes need:
|
||||||
|
// context cancellation merging, the detached-cleanup timeout, the per-run
|
||||||
|
// progress accessor the self-status tool reads, the legacy `submit`
|
||||||
|
// compatibility tool (submit.go), the ancestor progress bridge (progress.go),
|
||||||
|
// and the run-finalizer machinery — one source of truth.
|
||||||
|
//
|
||||||
|
// The kernel depends only on majordomo + executus/tool + the run.Ports
|
||||||
|
// interfaces; persistence, audit, the persona/skill nouns, and the critic are
|
||||||
|
// host-supplied via Ports (see ports.go) so importing the kernel never drags in
|
||||||
|
// a store or a battery.
|
||||||
|
package run
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"errors"
|
||||||
|
"log/slog"
|
||||||
|
"sync/atomic"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"gitea.stevedudenhoeffer.com/steve/executus/tool"
|
||||||
|
)
|
||||||
|
|
||||||
|
// ErrShutdown is the cancellation cause set on mort's base lifecycle context
|
||||||
|
// when the process is shutting down (SIGTERM after the drain window). The
|
||||||
|
// agent executor uses it to distinguish a run interrupted by shutdown (which
|
||||||
|
// should be left durable-recoverable) from a run that errored or hit its own
|
||||||
|
// deadline (terminal).
|
||||||
|
var ErrShutdown = errors.New("mort: shutting down")
|
||||||
|
|
||||||
|
// CleanupContextTimeout caps how long a run's post-completion cleanup ops
|
||||||
|
// (budget commit, audit Close, attachment bookkeeping) may wait on
|
||||||
|
// storage after detaching from the caller's — possibly already
|
||||||
|
// cancelled — context. 10s is generous for a single-row UPDATE against
|
||||||
|
// MySQL; longer suggests a hung connection the run goroutine shouldn't
|
||||||
|
// keep waiting on. Both executors derive their cleanup contexts as
|
||||||
|
// context.WithTimeout(context.WithoutCancel(ctx), CleanupContextTimeout).
|
||||||
|
const CleanupContextTimeout = 10 * time.Second
|
||||||
|
|
||||||
|
// Reserved state-react lifecycle event keys, shared so both nouns surface
|
||||||
|
// the same UX shape. Namespaced with double-underscores to make accidental
|
||||||
|
// collision with a tool name near-impossible.
|
||||||
|
const (
|
||||||
|
StateReactStart = "__start__"
|
||||||
|
StateReactEnd = "__end__"
|
||||||
|
StateReactError = "__error__"
|
||||||
|
StateReactBudgetExceeded = "__budget_exceeded__"
|
||||||
|
)
|
||||||
|
|
||||||
|
// MergeCancellation returns a context cancelled when EITHER input is
|
||||||
|
// cancelled, propagating the cancellation Cause from whichever fired. Used
|
||||||
|
// by the lane preemption path (the lane's per-job ctx.Cause flows into the
|
||||||
|
// run context) and by the runtime-detach path (process shutdown still
|
||||||
|
// reaches a run whose deadline was reset after a lane wait). Always call
|
||||||
|
// the returned cancel to release the watcher goroutine; it is also invoked
|
||||||
|
// once when either input fires.
|
||||||
|
func MergeCancellation(parent, secondary context.Context) (context.Context, context.CancelFunc) {
|
||||||
|
merged, cancel := context.WithCancelCause(parent)
|
||||||
|
go func() {
|
||||||
|
select {
|
||||||
|
case <-merged.Done():
|
||||||
|
return
|
||||||
|
case <-secondary.Done():
|
||||||
|
cancel(context.Cause(secondary))
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
return merged, func() { cancel(nil) }
|
||||||
|
}
|
||||||
|
|
||||||
|
// RunFinalizer is invoked at run finish so per-run tool state (open HTTP
|
||||||
|
// streams, per-run code_exec counters, per-run search budgets) is released
|
||||||
|
// and the process-lifetime maps keyed by run id don't grow unbounded.
|
||||||
|
// Both executors fire their registered finalizers via FireFinalizers.
|
||||||
|
type RunFinalizer interface {
|
||||||
|
FinalizeRun(runID string)
|
||||||
|
}
|
||||||
|
|
||||||
|
// FireFinalizers runs every finalizer for runID, isolating each behind a
|
||||||
|
// panic-recover so one buggy finalizer can't take down the run goroutine
|
||||||
|
// or skip the others. Safe to call with a nil/empty slice.
|
||||||
|
func FireFinalizers(fs []RunFinalizer, runID string) {
|
||||||
|
for _, f := range fs {
|
||||||
|
if f == nil {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
func() {
|
||||||
|
defer func() {
|
||||||
|
if r := recover(); r != nil {
|
||||||
|
slog.Error("runengine: run finalizer panicked",
|
||||||
|
"run_id", runID, "panic", r)
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
f.FinalizeRun(runID)
|
||||||
|
}()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// RunTally is the narrow live-progress source the RunStateAccessor reads —
|
||||||
|
// the running token and tool-call counts for the in-flight run. The audit
|
||||||
|
// battery's writer satisfies it; this interface is how the run kernel reads
|
||||||
|
// live tallies without importing the audit package (the inversion of mort's
|
||||||
|
// direct *skillaudit.Writer dependency).
|
||||||
|
type RunTally interface {
|
||||||
|
// TokenStats returns the running input, output, and thinking token totals.
|
||||||
|
TokenStats() (in, out, thinking int64)
|
||||||
|
// ToolCallsCount returns the number of tool calls executed so far.
|
||||||
|
ToolCallsCount() int
|
||||||
|
}
|
||||||
|
|
||||||
|
// RunStateAccessor is the per-run live-progress accessor the executor
|
||||||
|
// stamps on Invocation.RunState before building the toolbox, so the
|
||||||
|
// self-status tool can report iteration / tool-calls / tokens / elapsed for
|
||||||
|
// the in-flight run. Construct with NewRunStateAccessor; the executor's step
|
||||||
|
// observer calls SetIteration each loop.
|
||||||
|
type RunStateAccessor struct {
|
||||||
|
tally RunTally
|
||||||
|
iter atomic.Int32
|
||||||
|
maxIter int
|
||||||
|
maxCalls int
|
||||||
|
startedAt time.Time
|
||||||
|
}
|
||||||
|
|
||||||
|
// NewRunStateAccessor builds the accessor. writer supplies the live token
|
||||||
|
// + tool-call tallies; maxIter / maxCalls are the reported caps (0 =
|
||||||
|
// uncapped); startedAt anchors the elapsed clock.
|
||||||
|
func NewRunStateAccessor(tally RunTally, maxIter, maxCalls int, startedAt time.Time) *RunStateAccessor {
|
||||||
|
return &RunStateAccessor{
|
||||||
|
tally: tally,
|
||||||
|
maxIter: maxIter,
|
||||||
|
maxCalls: maxCalls,
|
||||||
|
startedAt: startedAt,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// SetIteration records the current agent-loop iteration (called from the
|
||||||
|
// executor's step observer).
|
||||||
|
func (a *RunStateAccessor) SetIteration(iter int) { a.iter.Store(int32(iter)) }
|
||||||
|
|
||||||
|
// RunState satisfies tool.RunStateAccessor.
|
||||||
|
func (a *RunStateAccessor) RunState() tool.RunState {
|
||||||
|
in, out, think := a.tally.TokenStats()
|
||||||
|
return tool.RunState{
|
||||||
|
Iteration: int(a.iter.Load()),
|
||||||
|
MaxIterations: a.maxIter,
|
||||||
|
ToolCalls: a.tally.ToolCallsCount(),
|
||||||
|
MaxToolCalls: a.maxCalls,
|
||||||
|
InputTokens: in,
|
||||||
|
OutputTokens: out,
|
||||||
|
ThinkingTokens: think,
|
||||||
|
ElapsedSeconds: int(time.Since(a.startedAt).Seconds()),
|
||||||
|
}
|
||||||
|
}
|
||||||
+419
@@ -0,0 +1,419 @@
|
|||||||
|
package run
|
||||||
|
|
||||||
|
// steps.go — the per-run step emitter and the tool→step presentation
|
||||||
|
// mapping. This is the single place that turns the executor's post-step
|
||||||
|
// observer into ordered tool.Step records: one per tool call, each with a
|
||||||
|
// stable id, an open-vocabulary kind, and a human present-tense summary
|
||||||
|
// that flips running→complete/error.
|
||||||
|
//
|
||||||
|
// One source feeds two consumers (mirroring the OnEvent/OnToolEvent/
|
||||||
|
// PostRunResult pattern): the live tool.Invocation.OnStep callback
|
||||||
|
// (nil-safe) AND snapshot(), which the executor copies onto Result.Steps.
|
||||||
|
// Because the Result accumulation does not depend on OnStep being set,
|
||||||
|
// every surface — chat (JSON + SSE), Discord, cron, sub-agents — carries
|
||||||
|
// steps; OnStep is needed only for live streaming.
|
||||||
|
//
|
||||||
|
// LIMITATION (current): majordomo exposes only a POST-step observer, so
|
||||||
|
// the executor calls toolStart+toolEnd back-to-back after each tool has
|
||||||
|
// already run. Steps are therefore recorded faithfully, but step.StartedAt
|
||||||
|
// ≈ EndedAt and the intermediate "running" phase is never observable to a
|
||||||
|
// live OnStep consumer. A pre-dispatch hook (wrapping each tool's handler
|
||||||
|
// to emit toolStart before execution, like mort's state-react decorator)
|
||||||
|
// is a follow-up that would restore real start timing + the running phase.
|
||||||
|
// The emitter already supports that two-call shape — toolStart and toolEnd
|
||||||
|
// are separate methods — so wiring it later is additive.
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"encoding/json"
|
||||||
|
"fmt"
|
||||||
|
"net/url"
|
||||||
|
"strings"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"gitea.stevedudenhoeffer.com/steve/majordomo/llm"
|
||||||
|
|
||||||
|
"gitea.stevedudenhoeffer.com/steve/executus/tool"
|
||||||
|
)
|
||||||
|
|
||||||
|
// stepSummaryMaxLen caps the human summary length (section G size cap).
|
||||||
|
// Detail is unused in v1 (no live detail source while replies are
|
||||||
|
// generated blocking) so there is no Detail cap yet.
|
||||||
|
const stepSummaryMaxLen = 200
|
||||||
|
|
||||||
|
// stepEmitter accumulates ordered steps for one run and fires the live
|
||||||
|
// OnStep callback.
|
||||||
|
//
|
||||||
|
// Concurrency: touched ONLY from the agent-loop goroutine — the executor's
|
||||||
|
// stepObserver (and, once a pre-dispatch hook is wired, that hook) run
|
||||||
|
// there; majordomo executes a step's tool calls sequentially, and
|
||||||
|
// sub-agents build their own Invocation so they never reach this
|
||||||
|
// emitter. Same single-goroutine contract as the audit Writer and the
|
||||||
|
// critic ProgressRecorder — no internal lock.
|
||||||
|
type stepEmitter struct {
|
||||||
|
onStep func(ctx context.Context, ev tool.StepEvent)
|
||||||
|
now func() time.Time
|
||||||
|
|
||||||
|
seq int
|
||||||
|
steps []tool.Step // ordered; the snapshot for Result.Steps
|
||||||
|
byID map[string]int // step id -> index into steps
|
||||||
|
pending map[string][]string // correlation key -> queued running ids (FIFO)
|
||||||
|
}
|
||||||
|
|
||||||
|
// newStepEmitter returns an emitter that forwards to onStep (nil-safe).
|
||||||
|
func newStepEmitter(onStep func(ctx context.Context, ev tool.StepEvent)) *stepEmitter {
|
||||||
|
return &stepEmitter{
|
||||||
|
onStep: onStep,
|
||||||
|
now: time.Now,
|
||||||
|
byID: map[string]int{},
|
||||||
|
pending: map[string][]string{},
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// corrKey correlates a "start" (name + raw args, no call id available at
|
||||||
|
// the pre-dispatch hook) with its later "end" (the stepObserver has the
|
||||||
|
// full call incl. id + the same raw args).
|
||||||
|
func corrKey(name string, args json.RawMessage) string {
|
||||||
|
return name + "\x00" + string(args)
|
||||||
|
}
|
||||||
|
|
||||||
|
// toolStart records + emits the "start" of a tool call. Called from the
|
||||||
|
// pre-dispatch hookToolbox closure, before the tool runs.
|
||||||
|
func (e *stepEmitter) toolStart(ctx context.Context, name string, args json.RawMessage) {
|
||||||
|
if e == nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
step := e.newStep(name, args)
|
||||||
|
key := corrKey(name, args)
|
||||||
|
e.pending[key] = append(e.pending[key], step.ID)
|
||||||
|
e.fire(ctx, "start", step)
|
||||||
|
}
|
||||||
|
|
||||||
|
// toolEnd records + emits the terminal "end" of a tool call. Called from
|
||||||
|
// the stepObserver for each completed tool call. If no matching start was
|
||||||
|
// seen (e.g. a tool with a nil handler the pre-dispatch hook skipped), a
|
||||||
|
// start is synthesized so the step still appears.
|
||||||
|
func (e *stepEmitter) toolEnd(ctx context.Context, call llm.ToolCall, result string, isError bool) {
|
||||||
|
if e == nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
id := e.matchPending(call.Name, call.Arguments)
|
||||||
|
if id == "" {
|
||||||
|
id = e.newStep(call.Name, call.Arguments).ID
|
||||||
|
}
|
||||||
|
idx, ok := e.byID[id]
|
||||||
|
if !ok {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
step := &e.steps[idx]
|
||||||
|
end := e.now()
|
||||||
|
step.EndedAt = &end
|
||||||
|
if isError {
|
||||||
|
step.Status = "error"
|
||||||
|
} else {
|
||||||
|
step.Status = "complete"
|
||||||
|
}
|
||||||
|
if s := summaryForEnd(call.Name, call.Arguments, result, isError); s != "" {
|
||||||
|
step.Summary = s
|
||||||
|
}
|
||||||
|
e.fire(ctx, "end", *step)
|
||||||
|
}
|
||||||
|
|
||||||
|
// newStep mints + appends a running step and returns it (by value).
|
||||||
|
func (e *stepEmitter) newStep(name string, args json.RawMessage) tool.Step {
|
||||||
|
e.seq++
|
||||||
|
step := tool.Step{
|
||||||
|
ID: fmt.Sprintf("s%d", e.seq),
|
||||||
|
Kind: kindForTool(name),
|
||||||
|
Title: name,
|
||||||
|
Summary: summaryForStart(name, args),
|
||||||
|
Status: "running",
|
||||||
|
StartedAt: e.now(),
|
||||||
|
}
|
||||||
|
e.byID[step.ID] = len(e.steps)
|
||||||
|
e.steps = append(e.steps, step)
|
||||||
|
return step
|
||||||
|
}
|
||||||
|
|
||||||
|
// matchPending pops the oldest running step id for (name, args). Falls back to
|
||||||
|
// the OLDEST still-running step of the same tool name when the args don't
|
||||||
|
// byte-match between start and end (e.g. JSON key reordering). FIFO on the
|
||||||
|
// fallback too, consistent with the per-key queue pop above — closing the
|
||||||
|
// oldest avoids mis-correlating concurrent same-named calls. Returns "" on no
|
||||||
|
// match.
|
||||||
|
func (e *stepEmitter) matchPending(name string, args json.RawMessage) string {
|
||||||
|
key := corrKey(name, args)
|
||||||
|
if q := e.pending[key]; len(q) > 0 {
|
||||||
|
id := q[0]
|
||||||
|
if len(q) == 1 {
|
||||||
|
delete(e.pending, key)
|
||||||
|
} else {
|
||||||
|
e.pending[key] = q[1:]
|
||||||
|
}
|
||||||
|
return id
|
||||||
|
}
|
||||||
|
for i := 0; i < len(e.steps); i++ {
|
||||||
|
if e.steps[i].Title == name && e.steps[i].Status == "running" {
|
||||||
|
return e.steps[i].ID
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return ""
|
||||||
|
}
|
||||||
|
|
||||||
|
func (e *stepEmitter) fire(ctx context.Context, phase string, step tool.Step) {
|
||||||
|
if e.onStep == nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
e.onStep(ctx, tool.StepEvent{Phase: phase, Step: step})
|
||||||
|
}
|
||||||
|
|
||||||
|
// snapshot returns a copy of the ordered, deduplicated step set for the
|
||||||
|
// run Result. A step still "running" at run end (e.g. the run was
|
||||||
|
// cancelled mid-tool-call) is reported as-is.
|
||||||
|
func (e *stepEmitter) snapshot() []tool.Step {
|
||||||
|
if e == nil || len(e.steps) == 0 {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
out := make([]tool.Step, len(e.steps))
|
||||||
|
copy(out, e.steps)
|
||||||
|
return out
|
||||||
|
}
|
||||||
|
|
||||||
|
// kindForTool maps a tool name to an open-vocabulary step kind. Unknown
|
||||||
|
// tools fall back to "tool" — never an error, just a generic step (the
|
||||||
|
// client maps unknown kinds to a default icon). Loosely tracks the
|
||||||
|
// catalog in pkg/skilltools/CLAUDE.md.
|
||||||
|
func kindForTool(name string) string {
|
||||||
|
switch name {
|
||||||
|
case "web_search", "search_reddit", "wikipedia_summary":
|
||||||
|
return "search"
|
||||||
|
case "read_page", "read_pdf", "read_reddit", "read_video", "verify_url",
|
||||||
|
"summary_summarise", "summarize", "file_get_text", "file_get_metadata",
|
||||||
|
"http_get", "http_post", "http_get_stream", "http_stream_read":
|
||||||
|
return "read"
|
||||||
|
case "code_exec", "calculate":
|
||||||
|
return "code"
|
||||||
|
case "file_save", "file_get", "file_list", "file_delete", "file_search":
|
||||||
|
return "file"
|
||||||
|
case "kv_get", "kv_set", "kv_list", "kv_delete",
|
||||||
|
"remember", "recall", "chatbot_get_memories":
|
||||||
|
return "memory"
|
||||||
|
case "query", "query_research", "deepresearch", "animate",
|
||||||
|
"agent_invoke", "agent_invoke_parallel", "agent_spawn",
|
||||||
|
"agent_spawn_parallel", "skill_invoke", "skill_invoke_parallel":
|
||||||
|
return "delegate"
|
||||||
|
case "think":
|
||||||
|
return "thinking"
|
||||||
|
default:
|
||||||
|
switch {
|
||||||
|
case strings.HasPrefix(name, "image") || strings.Contains(name, "draw"):
|
||||||
|
return "image"
|
||||||
|
default:
|
||||||
|
return "tool"
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// summaryForStart builds the human present-tense running summary. It
|
||||||
|
// derives specifics from safe arg fields only; secret-bearing tools
|
||||||
|
// (mcp_call, email_send, http_*) are summarized without echoing args.
|
||||||
|
func summaryForStart(name string, args json.RawMessage) string {
|
||||||
|
var s string
|
||||||
|
switch name {
|
||||||
|
case "web_search":
|
||||||
|
if q := argString(args, "query", "q"); q != "" {
|
||||||
|
s = fmt.Sprintf("Searching the web for %q", q)
|
||||||
|
} else {
|
||||||
|
s = "Searching the web"
|
||||||
|
}
|
||||||
|
case "search_reddit":
|
||||||
|
if q := argString(args, "query", "q"); q != "" {
|
||||||
|
s = fmt.Sprintf("Searching Reddit for %q", q)
|
||||||
|
} else {
|
||||||
|
s = "Searching Reddit"
|
||||||
|
}
|
||||||
|
case "wikipedia_summary":
|
||||||
|
if q := argString(args, "query", "title"); q != "" {
|
||||||
|
s = fmt.Sprintf("Looking up %q on Wikipedia", q)
|
||||||
|
} else {
|
||||||
|
s = "Looking up Wikipedia"
|
||||||
|
}
|
||||||
|
case "read_page", "read_pdf", "read_reddit", "read_video", "verify_url":
|
||||||
|
if u := argString(args, "url", "post", "page"); u != "" {
|
||||||
|
s = "Reading " + hostOf(u)
|
||||||
|
} else {
|
||||||
|
s = "Reading a page"
|
||||||
|
}
|
||||||
|
case "http_get", "http_post", "http_get_stream":
|
||||||
|
// Show host only — a full URL can embed credentials/tokens.
|
||||||
|
if u := argString(args, "url"); u != "" {
|
||||||
|
s = "Fetching " + hostOf(u)
|
||||||
|
} else {
|
||||||
|
s = "Making an HTTP request"
|
||||||
|
}
|
||||||
|
case "summary_summarise", "summarize":
|
||||||
|
s = "Summarizing text"
|
||||||
|
case "translate":
|
||||||
|
if lang := argString(args, "target_lang", "target_language", "lang"); lang != "" {
|
||||||
|
s = "Translating to " + lang
|
||||||
|
} else {
|
||||||
|
s = "Translating text"
|
||||||
|
}
|
||||||
|
case "code_exec":
|
||||||
|
s = "Running code"
|
||||||
|
case "calculate":
|
||||||
|
if q := argString(args, "query", "expression", "expr"); q != "" {
|
||||||
|
s = "Calculating " + truncateStep(q, 60)
|
||||||
|
} else {
|
||||||
|
s = "Calculating"
|
||||||
|
}
|
||||||
|
case "remember":
|
||||||
|
// Never echo the stored value.
|
||||||
|
s = "Saving a memory"
|
||||||
|
case "recall", "chatbot_get_memories":
|
||||||
|
s = "Recalling memories"
|
||||||
|
case "kv_get", "kv_list":
|
||||||
|
s = "Reading saved data"
|
||||||
|
case "kv_set":
|
||||||
|
s = "Saving data"
|
||||||
|
case "kv_delete":
|
||||||
|
s = "Deleting saved data"
|
||||||
|
case "file_save":
|
||||||
|
if n := argString(args, "name", "filename"); n != "" {
|
||||||
|
s = "Saving file " + truncateStep(n, 60)
|
||||||
|
} else {
|
||||||
|
s = "Saving a file"
|
||||||
|
}
|
||||||
|
case "file_get", "file_get_text", "file_get_metadata":
|
||||||
|
s = "Reading a file"
|
||||||
|
case "file_list", "file_search":
|
||||||
|
s = "Listing files"
|
||||||
|
case "query", "query_research":
|
||||||
|
if q := argString(args, "query", "question", "prompt", "task"); q != "" {
|
||||||
|
s = "Researching " + truncateStep(q, 80)
|
||||||
|
} else {
|
||||||
|
s = "Researching"
|
||||||
|
}
|
||||||
|
case "deepresearch":
|
||||||
|
s = "Running deep research"
|
||||||
|
case "animate":
|
||||||
|
s = "Generating an animation"
|
||||||
|
case "agent_invoke", "agent_spawn":
|
||||||
|
if a := argString(args, "agent", "agent_name", "name"); a != "" {
|
||||||
|
s = "Delegating to " + a
|
||||||
|
} else {
|
||||||
|
s = "Delegating to a sub-agent"
|
||||||
|
}
|
||||||
|
case "agent_invoke_parallel", "agent_spawn_parallel":
|
||||||
|
s = "Delegating to sub-agents"
|
||||||
|
case "skill_invoke":
|
||||||
|
if sk := argString(args, "skill_name", "skill", "name"); sk != "" {
|
||||||
|
s = "Running skill " + sk
|
||||||
|
} else {
|
||||||
|
s = "Running a skill"
|
||||||
|
}
|
||||||
|
case "skill_invoke_parallel":
|
||||||
|
s = "Running skills in parallel"
|
||||||
|
case "think":
|
||||||
|
s = "Thinking"
|
||||||
|
case "mcp_call":
|
||||||
|
// Redact: MCP args frequently carry secrets. Name server/tool only.
|
||||||
|
srv, tl := argString(args, "server"), argString(args, "tool")
|
||||||
|
switch {
|
||||||
|
case srv != "" && tl != "":
|
||||||
|
s = fmt.Sprintf("Calling %s/%s", srv, tl)
|
||||||
|
case srv != "":
|
||||||
|
s = "Calling " + srv
|
||||||
|
default:
|
||||||
|
s = "Calling an MCP tool"
|
||||||
|
}
|
||||||
|
case "email_send":
|
||||||
|
// Redact recipients + body.
|
||||||
|
s = "Sending an email"
|
||||||
|
default:
|
||||||
|
s = "Using " + name
|
||||||
|
}
|
||||||
|
return truncateStep(s, stepSummaryMaxLen)
|
||||||
|
}
|
||||||
|
|
||||||
|
// summaryForEnd optionally upgrades the summary to a cheap result phrase.
|
||||||
|
// Returns "" to keep the running summary (the caller then just flips the
|
||||||
|
// status). Never returns a phrase derived from raw result bytes.
|
||||||
|
func summaryForEnd(name string, _ json.RawMessage, result string, isError bool) string {
|
||||||
|
if isError {
|
||||||
|
return ""
|
||||||
|
}
|
||||||
|
switch name {
|
||||||
|
case "web_search", "search_reddit":
|
||||||
|
if n := countResults(result); n >= 0 {
|
||||||
|
return fmt.Sprintf("Found %d result%s", n, plural(n))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return ""
|
||||||
|
}
|
||||||
|
|
||||||
|
// argString pulls the first present non-empty string field from a tool's
|
||||||
|
// raw JSON args, trying keys in order. Returns "" when none parse.
|
||||||
|
func argString(args json.RawMessage, keys ...string) string {
|
||||||
|
if len(args) == 0 {
|
||||||
|
return ""
|
||||||
|
}
|
||||||
|
var m map[string]any
|
||||||
|
if err := json.Unmarshal(args, &m); err != nil {
|
||||||
|
return ""
|
||||||
|
}
|
||||||
|
for _, k := range keys {
|
||||||
|
if v, ok := m[k]; ok {
|
||||||
|
if s, ok := v.(string); ok && strings.TrimSpace(s) != "" {
|
||||||
|
return strings.TrimSpace(s)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return ""
|
||||||
|
}
|
||||||
|
|
||||||
|
// countResults parses a v11-style {"results":[...]} envelope and returns
|
||||||
|
// the count, or -1 when the shape doesn't match.
|
||||||
|
func countResults(result string) int {
|
||||||
|
if strings.TrimSpace(result) == "" {
|
||||||
|
return -1
|
||||||
|
}
|
||||||
|
var env struct {
|
||||||
|
Results []json.RawMessage `json:"results"`
|
||||||
|
}
|
||||||
|
if err := json.Unmarshal([]byte(result), &env); err != nil || env.Results == nil {
|
||||||
|
return -1
|
||||||
|
}
|
||||||
|
return len(env.Results)
|
||||||
|
}
|
||||||
|
|
||||||
|
// hostOf returns the bare host (no leading www.) of a URL, or a short
|
||||||
|
// form of the raw string when it doesn't parse as a URL.
|
||||||
|
func hostOf(raw string) string {
|
||||||
|
if u, err := url.Parse(raw); err == nil && u.Host != "" {
|
||||||
|
return strings.TrimPrefix(u.Host, "www.")
|
||||||
|
}
|
||||||
|
return truncateStep(raw, 60)
|
||||||
|
}
|
||||||
|
|
||||||
|
// truncateStep rune-safely caps s to max, appending an ellipsis when cut.
|
||||||
|
func truncateStep(s string, max int) string {
|
||||||
|
if max <= 0 {
|
||||||
|
return ""
|
||||||
|
}
|
||||||
|
r := []rune(s)
|
||||||
|
if len(r) <= max {
|
||||||
|
return s
|
||||||
|
}
|
||||||
|
if max == 1 {
|
||||||
|
return string(r[:1])
|
||||||
|
}
|
||||||
|
return string(r[:max-1]) + "…"
|
||||||
|
}
|
||||||
|
|
||||||
|
func plural(n int) string {
|
||||||
|
if n == 1 {
|
||||||
|
return ""
|
||||||
|
}
|
||||||
|
return "s"
|
||||||
|
}
|
||||||
@@ -0,0 +1,84 @@
|
|||||||
|
package run
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"strings"
|
||||||
|
"sync"
|
||||||
|
|
||||||
|
llm "gitea.stevedudenhoeffer.com/steve/majordomo/llm"
|
||||||
|
)
|
||||||
|
|
||||||
|
// SubmitCapture records the output a run's `submit` tool received.
|
||||||
|
//
|
||||||
|
// Why this exists: legacy agentkit injected a synthetic `submit` tool and
|
||||||
|
// ended the loop when it fired; years of mort system prompts (agent
|
||||||
|
// YAMLs, skill manifests, the executors' platform headers) teach the
|
||||||
|
// model to "call submit with your final answer". majordomo's agent loop
|
||||||
|
// has no submit concept — it ends when the model replies WITHOUT tool
|
||||||
|
// calls. Dropping submit cold would make every prompt-trained model
|
||||||
|
// burn turns on "unknown tool \"submit\"" errors.
|
||||||
|
//
|
||||||
|
// The compatibility shape: the executors add NewSubmitTool's tool to
|
||||||
|
// every run's toolset (unless the palette already defines a `submit`).
|
||||||
|
// The handler records the FIRST submitted answer and tells the model
|
||||||
|
// the answer was accepted so its next turn is a bare reply (which ends
|
||||||
|
// the loop naturally). After the run, the executor consults
|
||||||
|
// Output(loopOutput, runErr): a captured submission wins over an empty
|
||||||
|
// or budget-exhausted ending, so a model that submits on its final
|
||||||
|
// allowed step still produces its answer instead of ErrMaxSteps.
|
||||||
|
type SubmitCapture struct {
|
||||||
|
mu sync.Mutex
|
||||||
|
output string
|
||||||
|
called bool
|
||||||
|
}
|
||||||
|
|
||||||
|
// Record stores the first submitted answer; later calls are ignored
|
||||||
|
// (matching legacy agentkit's "multiple calls keep the first" contract).
|
||||||
|
func (c *SubmitCapture) Record(output string) {
|
||||||
|
c.mu.Lock()
|
||||||
|
defer c.mu.Unlock()
|
||||||
|
if c.called {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
c.called = true
|
||||||
|
c.output = output
|
||||||
|
}
|
||||||
|
|
||||||
|
// Submitted returns the captured answer and whether submit fired.
|
||||||
|
func (c *SubmitCapture) Submitted() (string, bool) {
|
||||||
|
c.mu.Lock()
|
||||||
|
defer c.mu.Unlock()
|
||||||
|
return c.output, c.called
|
||||||
|
}
|
||||||
|
|
||||||
|
// Output resolves the run's final output: the submitted answer when the
|
||||||
|
// model called submit (parity with legacy agentkit, where submit's argument
|
||||||
|
// WAS the run output), otherwise the loop's own final text. resolvedErr
|
||||||
|
// is nil when a submission exists — a run that submitted its answer and
|
||||||
|
// then ran out of steps (or timed out composing the courtesy
|
||||||
|
// confirmation turn) is a SUCCESS, not an error.
|
||||||
|
func (c *SubmitCapture) Output(loopOutput string, runErr error) (output string, resolvedErr error) {
|
||||||
|
if out, ok := c.Submitted(); ok {
|
||||||
|
return out, nil
|
||||||
|
}
|
||||||
|
return loopOutput, runErr
|
||||||
|
}
|
||||||
|
|
||||||
|
// submitArgs mirrors legacy agentkit's synthetic submit tool schema so
|
||||||
|
// models prompted under the old contract emit compatible calls.
|
||||||
|
type submitArgs struct {
|
||||||
|
Output string `json:"output" description:"The final answer, summary, or output for this task."`
|
||||||
|
}
|
||||||
|
|
||||||
|
// NewSubmitTool builds the compatibility `submit` tool bound to the
|
||||||
|
// given capture. Both executors (skill + agent) install one per run.
|
||||||
|
func NewSubmitTool(capture *SubmitCapture) llm.Tool {
|
||||||
|
return llm.DefineTool[submitArgs](
|
||||||
|
"submit",
|
||||||
|
"Submit your final answer or output to end this task. Call exactly once when you are done.",
|
||||||
|
func(_ context.Context, args submitArgs) (any, error) {
|
||||||
|
capture.Record(strings.TrimSpace(args.Output))
|
||||||
|
return "Final answer recorded. Do not call any more tools; reply now with a brief closing message.", nil
|
||||||
|
},
|
||||||
|
)
|
||||||
|
}
|
||||||
Reference in New Issue
Block a user