From aab950f1c38a1b82ff2e6c06bef5aedf149d9fb6 Mon Sep 17 00:00:00 2001 From: Steve Dudenhoeffer Date: Fri, 26 Jun 2026 19:58:20 -0400 Subject: [PATCH 1/8] P2 (foundation): run-loop mechanics + RunnableAgent DTO MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Stand up the executus/run kernel foundation, decoupled from mort: - runengine.go: the shared run-loop scaffolding (MergeCancellation, CleanupContextTimeout, RunFinalizer/FireFinalizers, RunStateAccessor) moved from mort. The accessor's *skillaudit.Writer dependency is inverted to a narrow run.RunTally interface (TokenStats + ToolCallsCount) — the kernel reads live tallies without importing the audit battery. - submit.go: the legacy submit-capture compat tool (stdlib + majordomo/llm). - agent.go: RunnableAgent DTO — the kernel's view of "a thing to run" (tier, prompt, caps, palette, phases, critic config). The persona Agent and saved Skill will LOWER into this DTO so the kernel never imports a noun battery. This is the spine of the agentexec.Run(*agents.Agent) inversion. run/ builds with only majordomo + executus/tool. The executor merge (agentexec+skillexec -> run.Executor) and the nil-safe run.Ports (Audit/Critic/Budget/Checkpointer/PaletteSource) are the next P2 block. Co-Authored-By: Claude Opus 4.8 (1M context) --- CLAUDE.md | 6 +- run/agent.go | 76 +++++++++++++++++++++++ run/runengine.go | 157 +++++++++++++++++++++++++++++++++++++++++++++++ run/submit.go | 84 +++++++++++++++++++++++++ 4 files changed, 321 insertions(+), 2 deletions(-) create mode 100644 run/agent.go create mode 100644 run/runengine.go create mode 100644 run/submit.go diff --git a/CLAUDE.md b/CLAUDE.md index dcc609e..f9b1dca 100644 --- a/CLAUDE.md +++ b/CLAUDE.md @@ -43,8 +43,10 @@ CORE (majordomo + stdlib): fanout/ programmatic N×M swarm [P0 ✓] deliver/ output egress seam (+ Discard/Stdout) [P0 ✓] identity/ caller identity seams [P0 ✓] - run/ progress bridge now; the executor kernel + [P0 partial] - nil-safe Ports + RunnableAgent later [P2] + run/ run-loop mechanics (cancel-merge, finalizers, [P2 wip] + RunStateAccessor via RunTally seam, submit, + progress bridge) + RunnableAgent DTO done; + executor merge + nil-safe run.Ports next [P2] dispatchguard/ loop/depth/fan-out caps [P0 ✓] pendingattach/ attachment dedupe [P0 ✓] tool/ registry + 3-stage permissions + ssrf [P1 ✓] diff --git a/run/agent.go b/run/agent.go new file mode 100644 index 0000000..18a0a98 --- /dev/null +++ b/run/agent.go @@ -0,0 +1,76 @@ +package run + +import "time" + +// RunnableAgent is the kernel's view of "a thing to run": an identity, a model +// tier, a system prompt, execution caps, and a tool palette. It is a plain DTO +// on purpose — the run kernel never imports a noun battery. The persona Agent +// and the saved Skill each LOWER themselves into a RunnableAgent (a ToRunnable +// method on the battery side), and the kernel runs the DTO. This is the +// inversion of mort's agentexec.Executor.Run(*agents.Agent): the executor no +// longer depends on the persona struct, only on this shape. +// +// A light host can build a RunnableAgent inline (model tier + prompt + a few +// tool names) for a one-shot bounded run, with no persona or skill battery at +// all — that is exactly gadfly's swarm task. +type RunnableAgent struct { + // ID is a stable identifier for the run subject (an agent/skill UUID, or + // any host-chosen id). Used for audit attribution and dispatch-guard + // genealogy. Empty is allowed for anonymous one-shot runs. + ID string + + // Name is a human label (audit/logs/delivery). Empty is allowed. + Name string + + // SystemPrompt is the agent's base system prompt (before per-run + // personalization, which a host layers via Ports). + SystemPrompt string + + // ModelTier is a tier alias or concrete spec resolved through + // model.ParseModelForContext. Empty resolves to the host's default tier. + ModelTier string + + // MaxIterations caps the agent loop's tool-dispatch steps. 0 = kernel + // default. MaxRuntime caps wall-clock for the whole run (the kernel starts + // this clock AFTER any lane dequeue, not at submission). 0 = kernel + // default. + MaxIterations int + MaxRuntime time.Duration + + // LowLevelTools are tool-registry names the run may call directly. + // SkillPalette / SubAgentPalette name saved skills / sub-agents exposed as + // skill__ / agent__ delegation tools, resolved through + // Ports.Palette (nil Palette => those entries are inert). + LowLevelTools []string + SkillPalette []string + SubAgentPalette []string + + // Phases optionally model a multi-step pipeline (each phase its own prompt + // + tier + tools). An empty slice is a single-phase run — the common case. + Phases []Phase + + // Critic configures the optional two-tier run-critic (Ports.Critic). The + // zero value (disabled) is the light-host default. + Critic CriticConfig +} + +// Phase is one step of a multi-step run: its own system prompt, model tier, +// iteration cap, and tool subset. Optional phases may be skipped by the +// pipeline when their precondition isn't met. +type Phase struct { + Name string + SystemPrompt string + ModelTier string + MaxIterations int + Tools []string + Optional bool +} + +// CriticConfig configures the optional run-critic. Enabled gates whether a +// critic monitor is started at all; BackstopMultiplier sets the hard-kill +// deadline as a multiple of the soft trigger (MaxRuntime). A non-positive +// multiplier uses the kernel default. +type CriticConfig struct { + Enabled bool + BackstopMultiplier float64 +} diff --git a/run/runengine.go b/run/runengine.go new file mode 100644 index 0000000..2b311e4 --- /dev/null +++ b/run/runengine.go @@ -0,0 +1,157 @@ +// Package run is executus's run kernel: the shared run-loop mechanics around +// majordomo's agent loop, plus the host seams (run.Ports / RunnableAgent) that +// let one executor serve every surface — a light host's bounded one-shot run, +// a heavy host's persona agent or saved skill — without the kernel importing a +// battery. +// +// This file holds the genuinely-identical scaffolding both run shapes need: +// context cancellation merging, the detached-cleanup timeout, the per-run +// progress accessor the self-status tool reads, the legacy `submit` +// compatibility tool (submit.go), the ancestor progress bridge (progress.go), +// and the run-finalizer machinery — one source of truth. +// +// The kernel depends only on majordomo + executus/tool + the run.Ports +// interfaces; persistence, audit, the persona/skill nouns, and the critic are +// host-supplied via Ports (see ports.go) so importing the kernel never drags in +// a store or a battery. +package run + +import ( + "context" + "errors" + "log/slog" + "sync/atomic" + "time" + + "gitea.stevedudenhoeffer.com/steve/executus/tool" +) + +// ErrShutdown is the cancellation cause set on mort's base lifecycle context +// when the process is shutting down (SIGTERM after the drain window). The +// agent executor uses it to distinguish a run interrupted by shutdown (which +// should be left durable-recoverable) from a run that errored or hit its own +// deadline (terminal). +var ErrShutdown = errors.New("mort: shutting down") + +// CleanupContextTimeout caps how long a run's post-completion cleanup ops +// (budget commit, audit Close, attachment bookkeeping) may wait on +// storage after detaching from the caller's — possibly already +// cancelled — context. 10s is generous for a single-row UPDATE against +// MySQL; longer suggests a hung connection the run goroutine shouldn't +// keep waiting on. Both executors derive their cleanup contexts as +// context.WithTimeout(context.WithoutCancel(ctx), CleanupContextTimeout). +const CleanupContextTimeout = 10 * time.Second + +// Reserved state-react lifecycle event keys, shared so both nouns surface +// the same UX shape. Namespaced with double-underscores to make accidental +// collision with a tool name near-impossible. +const ( + StateReactStart = "__start__" + StateReactEnd = "__end__" + StateReactError = "__error__" + StateReactBudgetExceeded = "__budget_exceeded__" +) + +// MergeCancellation returns a context cancelled when EITHER input is +// cancelled, propagating the cancellation Cause from whichever fired. Used +// by the lane preemption path (the lane's per-job ctx.Cause flows into the +// run context) and by the runtime-detach path (process shutdown still +// reaches a run whose deadline was reset after a lane wait). Always call +// the returned cancel to release the watcher goroutine; it is also invoked +// once when either input fires. +func MergeCancellation(parent, secondary context.Context) (context.Context, context.CancelFunc) { + merged, cancel := context.WithCancelCause(parent) + go func() { + select { + case <-merged.Done(): + return + case <-secondary.Done(): + cancel(context.Cause(secondary)) + } + }() + return merged, func() { cancel(nil) } +} + +// RunFinalizer is invoked at run finish so per-run tool state (open HTTP +// streams, per-run code_exec counters, per-run search budgets) is released +// and the process-lifetime maps keyed by run id don't grow unbounded. +// Both executors fire their registered finalizers via FireFinalizers. +type RunFinalizer interface { + FinalizeRun(runID string) +} + +// FireFinalizers runs every finalizer for runID, isolating each behind a +// panic-recover so one buggy finalizer can't take down the run goroutine +// or skip the others. Safe to call with a nil/empty slice. +func FireFinalizers(fs []RunFinalizer, runID string) { + for _, f := range fs { + if f == nil { + continue + } + func() { + defer func() { + if r := recover(); r != nil { + slog.Error("runengine: run finalizer panicked", + "run_id", runID, "panic", r) + } + }() + f.FinalizeRun(runID) + }() + } +} + +// RunTally is the narrow live-progress source the RunStateAccessor reads — +// the running token and tool-call counts for the in-flight run. The audit +// battery's writer satisfies it; this interface is how the run kernel reads +// live tallies without importing the audit package (the inversion of mort's +// direct *skillaudit.Writer dependency). +type RunTally interface { + // TokenStats returns the running input, output, and thinking token totals. + TokenStats() (in, out, thinking int64) + // ToolCallsCount returns the number of tool calls executed so far. + ToolCallsCount() int +} + +// RunStateAccessor is the per-run live-progress accessor the executor +// stamps on Invocation.RunState before building the toolbox, so the +// self-status tool can report iteration / tool-calls / tokens / elapsed for +// the in-flight run. Construct with NewRunStateAccessor; the executor's step +// observer calls SetIteration each loop. +type RunStateAccessor struct { + tally RunTally + iter atomic.Int32 + maxIter int + maxCalls int + startedAt time.Time +} + +// NewRunStateAccessor builds the accessor. writer supplies the live token +// + tool-call tallies; maxIter / maxCalls are the reported caps (0 = +// uncapped); startedAt anchors the elapsed clock. +func NewRunStateAccessor(tally RunTally, maxIter, maxCalls int, startedAt time.Time) *RunStateAccessor { + return &RunStateAccessor{ + tally: tally, + maxIter: maxIter, + maxCalls: maxCalls, + startedAt: startedAt, + } +} + +// SetIteration records the current agent-loop iteration (called from the +// executor's step observer). +func (a *RunStateAccessor) SetIteration(iter int) { a.iter.Store(int32(iter)) } + +// RunState satisfies tool.RunStateAccessor. +func (a *RunStateAccessor) RunState() tool.RunState { + in, out, think := a.tally.TokenStats() + return tool.RunState{ + Iteration: int(a.iter.Load()), + MaxIterations: a.maxIter, + ToolCalls: a.tally.ToolCallsCount(), + MaxToolCalls: a.maxCalls, + InputTokens: in, + OutputTokens: out, + ThinkingTokens: think, + ElapsedSeconds: int(time.Since(a.startedAt).Seconds()), + } +} diff --git a/run/submit.go b/run/submit.go new file mode 100644 index 0000000..59ae221 --- /dev/null +++ b/run/submit.go @@ -0,0 +1,84 @@ +package run + +import ( + "context" + "strings" + "sync" + + llm "gitea.stevedudenhoeffer.com/steve/majordomo/llm" +) + +// SubmitCapture records the output a run's `submit` tool received. +// +// Why this exists: legacy agentkit injected a synthetic `submit` tool and +// ended the loop when it fired; years of mort system prompts (agent +// YAMLs, skill manifests, the executors' platform headers) teach the +// model to "call submit with your final answer". majordomo's agent loop +// has no submit concept — it ends when the model replies WITHOUT tool +// calls. Dropping submit cold would make every prompt-trained model +// burn turns on "unknown tool \"submit\"" errors. +// +// The compatibility shape: the executors add NewSubmitTool's tool to +// every run's toolset (unless the palette already defines a `submit`). +// The handler records the FIRST submitted answer and tells the model +// the answer was accepted so its next turn is a bare reply (which ends +// the loop naturally). After the run, the executor consults +// Output(loopOutput, runErr): a captured submission wins over an empty +// or budget-exhausted ending, so a model that submits on its final +// allowed step still produces its answer instead of ErrMaxSteps. +type SubmitCapture struct { + mu sync.Mutex + output string + called bool +} + +// Record stores the first submitted answer; later calls are ignored +// (matching legacy agentkit's "multiple calls keep the first" contract). +func (c *SubmitCapture) Record(output string) { + c.mu.Lock() + defer c.mu.Unlock() + if c.called { + return + } + c.called = true + c.output = output +} + +// Submitted returns the captured answer and whether submit fired. +func (c *SubmitCapture) Submitted() (string, bool) { + c.mu.Lock() + defer c.mu.Unlock() + return c.output, c.called +} + +// Output resolves the run's final output: the submitted answer when the +// model called submit (parity with legacy agentkit, where submit's argument +// WAS the run output), otherwise the loop's own final text. resolvedErr +// is nil when a submission exists — a run that submitted its answer and +// then ran out of steps (or timed out composing the courtesy +// confirmation turn) is a SUCCESS, not an error. +func (c *SubmitCapture) Output(loopOutput string, runErr error) (output string, resolvedErr error) { + if out, ok := c.Submitted(); ok { + return out, nil + } + return loopOutput, runErr +} + +// submitArgs mirrors legacy agentkit's synthetic submit tool schema so +// models prompted under the old contract emit compatible calls. +type submitArgs struct { + Output string `json:"output" description:"The final answer, summary, or output for this task."` +} + +// NewSubmitTool builds the compatibility `submit` tool bound to the +// given capture. Both executors (skill + agent) install one per run. +func NewSubmitTool(capture *SubmitCapture) llm.Tool { + return llm.DefineTool[submitArgs]( + "submit", + "Submit your final answer or output to end this task. Call exactly once when you are done.", + func(_ context.Context, args submitArgs) (any, error) { + capture.Record(strings.TrimSpace(args.Output)) + return "Final answer recorded. Do not call any more tools; reply now with a brief closing message.", nil + }, + ) +} -- 2.52.0 From 8b30b9f889af37fc06189e28455d262fbaa56cfa Mon Sep 17 00:00:00 2001 From: Steve Dudenhoeffer Date: Fri, 26 Jun 2026 20:17:26 -0400 Subject: [PATCH 2/8] P2: define nil-safe run.Ports (the inversion spine) Add run/ports.go: the host seams the executor will consume, every one nil-safe so a light host runs with the zero Ports (no persistence/audit/ budget/critic/delegation/delivery) and a heavy host wires each to a battery. Ports mirror mort's existing interfaces so the batteries implement them directly: - Audit + RunRecorder (mort skillaudit.Storage/Writer): StartRun -> per-run recorder (OnStep/OnTool/LogEvent/Close), recorder satisfies RunTally. - Budget (mort skillexec.BudgetTracker): Check / Commit. - Critic + CriticHandle (mort agentcritic): Monitor -> handle with RecordStep/RecordToolStart/Steer/Deadline/Stop (the loop wiring finalizes with the executor merge). - Checkpointer (mort agentexec.RunCheckpointer): Save/Complete/Fail. - PaletteSource (mort SkillInvokerForPalette + AgentInvokerForPalette): Resolve/Invoke skill + agent delegation. Plus host-neutral RunInfo / RunStats. This completes the P2 inversion DESIGN; the agentexec+skillexec -> run.Executor merge that consumes these Ports is the remaining P2 work. Co-Authored-By: Claude Opus 4.8 (1M context) --- CLAUDE.md | 9 +-- run/ports.go | 168 +++++++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 173 insertions(+), 4 deletions(-) create mode 100644 run/ports.go diff --git a/CLAUDE.md b/CLAUDE.md index f9b1dca..0c43131 100644 --- a/CLAUDE.md +++ b/CLAUDE.md @@ -43,10 +43,11 @@ CORE (majordomo + stdlib): fanout/ programmatic N×M swarm [P0 ✓] deliver/ output egress seam (+ Discard/Stdout) [P0 ✓] identity/ caller identity seams [P0 ✓] - run/ run-loop mechanics (cancel-merge, finalizers, [P2 wip] - RunStateAccessor via RunTally seam, submit, - progress bridge) + RunnableAgent DTO done; - executor merge + nil-safe run.Ports next [P2] + run/ run-loop mechanics + RunnableAgent DTO + [P2 wip] + nil-safe run.Ports (Audit/Budget/Critic/ + Checkpointer/PaletteSource) defined; the + agentexec+skillexec -> run.Executor MERGE + (consuming Ports) is the remaining P2 work [P2] dispatchguard/ loop/depth/fan-out caps [P0 ✓] pendingattach/ attachment dedupe [P0 ✓] tool/ registry + 3-stage permissions + ssrf [P1 ✓] diff --git a/run/ports.go b/run/ports.go new file mode 100644 index 0000000..e18ac74 --- /dev/null +++ b/run/ports.go @@ -0,0 +1,168 @@ +package run + +import ( + "context" + "time" + + "gitea.stevedudenhoeffer.com/steve/majordomo/llm" + + "gitea.stevedudenhoeffer.com/steve/executus/deliver" +) + +// Ports are the host seams the run executor consumes. Every field is nil-safe: +// a light host passes the zero Ports and gets a bounded, in-memory run with no +// persistence, audit, budget, critic, delegation, or delivery — which is +// exactly a gadfly swarm task. A heavy host (mort) wires each one to a battery. +// +// This struct IS the inversion: in mort, agentexec imports agents / +// agentcritic / skillaudit and skillexec imports skills / paste directly; here +// the kernel depends only on these interfaces, and the batteries implement +// them. The mort_*_adapters.go wall becomes the set of impls. +type Ports struct { + // Audit records the run trace (start, per-step/per-tool events, final + // stats). nil = no audit. + Audit Audit + // Budget gates and meters per-caller resource use. nil = unbounded. + Budget Budget + // Critic optionally monitors a long run for hangs/runaways. nil = none. + Critic Critic + // Checkpointer persists resumable progress for durable recovery. nil = no + // checkpointing (a run interrupted by shutdown is simply lost). + Checkpointer Checkpointer + // Palette resolves SkillPalette / SubAgentPalette entries into delegation + // tools (skill__ / agent__). nil = those entries are inert. + Palette PaletteSource + // Delivery is where the run's output + artifacts go. nil = the caller + // reads the Result in-process (the light-host default). + Delivery deliver.Delivery +} + +// RunInfo describes a run at start time — the attribution a recorder/critic +// needs. Host-neutral rename of mort's SkillRun start fields. +type RunInfo struct { + RunID string + SubjectID string // the agent/skill id being run (audit "skill_id") + Name string + CallerID string + ChannelID string + ParentRunID string + Inputs map[string]any + StartedAt time.Time +} + +// RunStats is the terminal roll-up a recorder's Close writes. Mirrors mort's +// skillaudit/skillexec RunStats. +type RunStats struct { + Status string // ok | error | timeout | budget_exceeded | cancelled | dry_run + Output string + Error string + ToolCalls int + RuntimeSeconds float64 + InputTokens int64 + OutputTokens int64 + ThinkingTokens int64 +} + +// --- Audit --- + +// Audit begins recording a run. StartRun returns a per-run RunRecorder (or nil +// to skip recording this run). The audit battery wires its Storage behind this. +type Audit interface { + StartRun(ctx context.Context, info RunInfo) RunRecorder +} + +// RunRecorder records the events of one in-flight run and its final stats. It +// satisfies RunTally so the kernel can surface live token/tool counts to the +// self-status tool. Mirrors mort's skillaudit.Writer. +type RunRecorder interface { + RunTally + // OnStep records one completed agent-loop iteration's model response. + OnStep(iter int, resp *llm.Response) + // OnTool records one executed tool call + its result. + OnTool(call llm.ToolCall, result string) + // LogEvent / LogError append structured events to the run log. + LogEvent(eventType string, payload map[string]any) + LogError(msg string) + // Close writes the terminal roll-up. Detaches from the caller's context + // internally so a cancelled run still records. + Close(ctx context.Context, stats RunStats) +} + +// --- Budget --- + +// Budget gates and meters per-caller resource use. Mirrors mort's +// skillexec.BudgetTracker. +type Budget interface { + // Check reports whether the caller has remaining budget (nil = allowed). + Check(ctx context.Context, callerID string) error + // Commit records that the caller spent runtimeSeconds on this run. + Commit(ctx context.Context, callerID string, runtimeSeconds float64) +} + +// --- Critic --- + +// Critic optionally monitors a long-running run (the two-tier soft/hard +// timeout). Monitor returns a handle the executor feeds progress into and +// queries for steer/deadline decisions; a nil handle means "not monitored". +// +// The exact wiring (how the handle's Steer/Deadline bind into majordomo's +// agent.WithSteer / agent.WithMaxStepsFunc / run-context cancellation) is +// finalized in the executor; this is the seam the agentcritic battery adapts. +type Critic interface { + Monitor(ctx context.Context, info RunInfo, softTimeout time.Duration) CriticHandle +} + +// CriticHandle is the executor's live link to a run's critic. +type CriticHandle interface { + // RecordStep / RecordToolStart keep the critic's activity clock fresh so a + // healthy-but-slow run is not mistaken for a hang. + RecordStep(iter int) + RecordToolStart(name, args string) + // Steer returns any messages the critic wants injected into the loop (a + // nudge), drained before each step — matches majordomo agent.WithSteer. + Steer() []llm.Message + // Deadline returns the current hard-kill deadline (the critic may extend + // it); the executor binds the run context to it. Zero = no hard deadline. + Deadline() time.Time + // Stop ends monitoring when the run finishes. + Stop() +} + +// --- Checkpointer --- + +// Checkpointer persists a run's resumable progress for durable recovery. +// Mirrors mort's agentexec.RunCheckpointer. +type Checkpointer interface { + // Save persists the run's current resumable progress (throttled). + Save(ctx context.Context, st RunCheckpointState) error + // Complete clears the checkpoint on success. + Complete(ctx context.Context) error + // Fail clears the checkpoint on terminal failure. A run interrupted by + // shutdown is left untouched so boot recovery picks it up. + Fail(ctx context.Context, err error) error +} + +// RunCheckpointState is the resumable snapshot a Checkpointer persists. Kept +// minimal here; the executor extends what it records during the merge. +type RunCheckpointState struct { + Messages []llm.Message + Iteration int +} + +// --- PaletteSource --- + +// PaletteSource resolves a RunnableAgent's SkillPalette / SubAgentPalette names +// into delegation tools and invokes them. Mirrors mort's +// SkillInvokerForPalette + AgentInvokerForPalette. nil Palette => palette +// entries are inert ("not configured" at first call). +type PaletteSource interface { + ResolveSkill(ctx context.Context, callerID, name string) (skillID string, err error) + InvokeSkill(ctx context.Context, callerID, channelID, name string, + inputs map[string]any, parentRunID string) (output, runID, status string, err error) + + ResolveAgent(ctx context.Context, callerID, name string) (agentID string, err error) + InvokeAgent(ctx context.Context, callerID, channelID, name string, + prompt, parentRunID, modelTierOverride, promptPrepend string, + toolsSubset []string, + onEvent func(ctx context.Context, event, emoji string)) (output, runID, status string, err error) +} -- 2.52.0 From fe5074c3cf21ede91945eec4c4907a666df3d631 Mon Sep 17 00:00:00 2001 From: Steve Dudenhoeffer Date: Fri, 26 Jun 2026 20:22:26 -0400 Subject: [PATCH 3/8] ci: sync gadfly review config to mort's foreman-provider setup Mirror mort's updated adversarial-review.yml: m1/m5 pulled in via the GADFLY_ENDPOINT_M1/_M5 secrets using gadfly's "foreman" provider type (providers m1/m5; models m1/qwen3:14b, m5/qwen3.6:35b-mlx), 2 cloud models, 3-lens suite, pinned to the gadfly :sha-6e3a83c image. Header adjusted for executus; functional config identical to mort's tested version. Co-Authored-By: Claude Opus 4.8 (1M context) --- .gitea/workflows/adversarial-review.yml | 36 ++++++++++++++++--------- 1 file changed, 24 insertions(+), 12 deletions(-) diff --git a/.gitea/workflows/adversarial-review.yml b/.gitea/workflows/adversarial-review.yml index ec1c21a..6c5182b 100644 --- a/.gitea/workflows/adversarial-review.yml +++ b/.gitea/workflows/adversarial-review.yml @@ -1,10 +1,11 @@ # Gadfly — agentic adversarial PR reviewer (https://gitea.stevedudenhoeffer.com/steve/gadfly). # -# Runs the published Gadfly image (pinned to :v1) as a specialist swarm and posts +# Runs the published Gadfly image (pinned to an immutable :sha- tag — act_runner +# caches :latest, and this build is what carries foreman provider-type support) +# as a specialist swarm and posts # ONE consolidated review comment as gitea-actions. Advisory only — never blocks a -# merge. This reviews executus PRs (same setup as mort: m1/m5 locals + 2 cloud, -# 3-lens suite). Gadfly is a simple system — treat its findings as advisory and -# double-check before acting. +# merge. This reviews executus PRs (same setup as mort: m1/m5 foreman locals + 2 +# cloud, 3-lens suite). Gadfly is a simple system — findings are advisory; double-check. name: Adversarial Review (Gadfly) @@ -44,20 +45,31 @@ jobs: # every PR with the 3-lens suite — the slow local lanes dominate wall time. timeout-minutes: 90 steps: - - uses: docker://gitea.stevedudenhoeffer.com/steve/gadfly:v1 + - uses: docker://gitea.stevedudenhoeffer.com/steve/gadfly:sha-6e3a83c env: GITEA_API: ${{ github.server_url }}/api/v1/repos/${{ github.repository }} GITEA_TOKEN: ${{ secrets.GITEA_TOKEN }} OLLAMA_CLOUD_API_KEY: ${{ secrets.OLLAMA_CLOUD_API_KEY }} - # Local Ollama boxes (each its own lane, cap 1). NOTE: both Macs must be - # awake/reachable for their reviews to run; if a box is offline, that - # model's comment shows an error and the others still post. - GADFLY_ENDPOINT_M1PRO: "ollama|http://192.168.0.175:11434" - GADFLY_ENDPOINT_M5MAX: "ollama|http://192.168.0.173:11434" + # Local Macs, reached through their foreman queues (native Ollama on the + # wire). Gadfly's GADFLY_ENDPOINT_* form with the "foreman" provider + # type: GADFLY_ENDPOINT_M1 registers provider "m1", _M5 registers "m5", + # each building a foreman-preset Ollama client at the given URL. Values + # (host + token) live in gitea secrets, each of the form: + # foreman|https://| + # (converted from the komodo LLM_* DSNs foreman://@). + # REQUIRES a Gadfly image built with foreman provider-type support + # (the GADFLY_ENDPOINT "foreman|..." type); on an older image the m1/m5 + # lanes error with "unknown provider foreman". The HTTPS-only LLM_* + # foreman:// DSN is the alternative that needs no image rebuild. + # NOTE: the Mac behind each foreman must still be awake/reachable; if a + # box is offline, that model's comment shows an error and the others + # still post. (Gitea secrets aren't auto-exposed — map each explicitly.) + GADFLY_ENDPOINT_M1: ${{ secrets.GADFLY_ENDPOINT_M1 }} + GADFLY_ENDPOINT_M5: ${{ secrets.GADFLY_ENDPOINT_M5 }} # 2 cloud (parallel) + M1 Pro + M5 Max — one consolidated comment each. - GADFLY_MODELS: "minimax-m3:cloud,deepseek-v4-flash:cloud,m1pro/qwen3:14b,m5max/qwen3.6:35b-mlx" + GADFLY_MODELS: "minimax-m3:cloud,deepseek-v4-flash:cloud,m1/qwen3:14b,m5/qwen3.6:35b-mlx" # cloud runs 2 at once; each Mac one at a time; all three lanes parallel. - GADFLY_PROVIDER_CONCURRENCY: "ollama-cloud=2,m1pro=1,m5max=1" + GADFLY_PROVIDER_CONCURRENCY: "ollama-cloud=2,m1=1,m5=1" # Default => the 3-lens suite (security, correctness, error-handling). # Set the repo var GADFLY_SPECIALISTS to override (csv / "all" / "auto"). GADFLY_SPECIALISTS: ${{ vars.GADFLY_SPECIALISTS || 'security,correctness,error-handling' }} -- 2.52.0 From 9a89d588b65f261ea85de21f1c2f572b44121e68 Mon Sep 17 00:00:00 2001 From: Steve Dudenhoeffer Date: Fri, 26 Jun 2026 20:32:58 -0400 Subject: [PATCH 4/8] fix: address gadfly P1 review (3 low-risk findings) Triaged gadfly's P1 review (advisory). Fixed the three clearly-correct, low-risk items; the rest were pre-existing mort behavior or theoretical: - model/call.go: recordUsage dropped fully-cached responses (input==0 && output==0 early-return missed CacheRead/CacheWrite-only usage, which Anthropic/OpenAI prompt-caching bills). Guard now also checks cache tokens. - llmmeta/helper.go: recordLedger swallowed Storage.RecordMetaCall errors; now logs them (slog.Warn) so a non-logging Storage impl can't silently drop audit rows. - model/cloud_sync.go: the ollama.com limit-cache used unbounded io.ReadAll; wrapped both reads in io.LimitReader(1 MiB) so a misbehaving endpoint can't exhaust memory before the 15s timeout. Noted-not-fixed (follow-ups / pre-existing mort semantics): tier_not_allowed ledger label on resolution failure, unknown-model usage attribution, the cloud_sync https scheme allowlist, and several theoretical/cosmetic items. Co-Authored-By: Claude Opus 4.8 (1M context) --- llmmeta/helper.go | 5 ++++- model/call.go | 2 +- model/cloud_sync.go | 9 +++++++-- 3 files changed, 12 insertions(+), 4 deletions(-) diff --git a/llmmeta/helper.go b/llmmeta/helper.go index 3fabdd0..f5773e2 100644 --- a/llmmeta/helper.go +++ b/llmmeta/helper.go @@ -34,6 +34,7 @@ import ( "context" "encoding/json" "fmt" + "log/slog" "strings" "time" @@ -574,7 +575,9 @@ func (h *Helper) recordLedger(ctx context.Context, call MetaCall) { if h.storage == nil { return } - _ = h.storage.RecordMetaCall(ctx, call) + if err := h.storage.RecordMetaCall(ctx, call); err != nil { + slog.Warn("llmmeta: failed to record ledger row", "err", err) + } } // tryParseJSON attempts to decode text as JSON. Returns the parsed diff --git a/model/call.go b/model/call.go index 30bb964..7e07c07 100644 --- a/model/call.go +++ b/model/call.go @@ -237,7 +237,7 @@ func recordUsage(ctx context.Context, resp *llm.Response) { return } u := resp.Usage - if u.InputTokens == 0 && u.OutputTokens == 0 { + if u.InputTokens == 0 && u.OutputTokens == 0 && u.CacheReadTokens == 0 && u.CacheWriteTokens == 0 { return } model := resolvedModelName(ctx, resp) diff --git a/model/cloud_sync.go b/model/cloud_sync.go index ae28512..c214ca1 100644 --- a/model/cloud_sync.go +++ b/model/cloud_sync.go @@ -314,7 +314,7 @@ func (c *CloudOllamaLimitCache) fetchTags(ctx context.Context) ([]string, error) return nil, err } defer resp.Body.Close() - body, err := io.ReadAll(resp.Body) + body, err := io.ReadAll(io.LimitReader(resp.Body, maxLimitCacheResponseBytes)) if err != nil { return nil, err } @@ -367,7 +367,7 @@ func (c *CloudOllamaLimitCache) fetchContextLength(ctx context.Context, modelNam return 0, err } defer resp.Body.Close() - respBody, err := io.ReadAll(resp.Body) + respBody, err := io.ReadAll(io.LimitReader(resp.Body, maxLimitCacheResponseBytes)) if err != nil { return 0, err } @@ -451,3 +451,8 @@ func truncate(b []byte, n int) string { } return string(b[:n]) + "...(truncated)" } + +// maxLimitCacheResponseBytes bounds the ollama.com limit-cache HTTP responses +// (/api/tags, /api/show) so a misbehaving endpoint can't stream an unbounded +// body before the 15s timeout fires. 1 MiB is far above any real response. +const maxLimitCacheResponseBytes = 1 << 20 -- 2.52.0 From 4132af0216ae08727e8ba7bfd56fb8ed2de98e30 Mon Sep 17 00:00:00 2001 From: Steve Dudenhoeffer Date: Fri, 26 Jun 2026 20:36:32 -0400 Subject: [PATCH 5/8] P2: move compactor -> compact/ + step instrumentation -> run/steps.go - compact/compactor.go: the per-run stateful context compactor (token-threshold gate, fast-tier middle summarisation, fold memory) lifted from mort's skillexec/compactor.go. Self-contained; its only dependency is a ModelResolver func (model.ParseModelForContext satisfies it) + a token threshold. - run/steps.go: the step-emission/instrumentation (stepEmitter, tool->kind/ summary mapping with redaction, Result.Steps accumulation) from agentexec, repointed onto executus/tool. Both build green. With the run-loop mechanics, RunnableAgent DTO, run.Ports, compactor, and step instrumentation now all in place, the remaining P2 work is the run.Executor itself (wiring these + majordomo's agent loop), which makes executus runnable. Co-Authored-By: Claude Opus 4.8 (1M context) --- CLAUDE.md | 5 +- compact/compactor.go | 331 +++++++++++++++++++++++++++++++++++ run/steps.go | 407 +++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 741 insertions(+), 2 deletions(-) create mode 100644 compact/compactor.go create mode 100644 run/steps.go diff --git a/CLAUDE.md b/CLAUDE.md index 2c38b9d..06ee6ed 100644 --- a/CLAUDE.md +++ b/CLAUDE.md @@ -45,7 +45,8 @@ CORE (majordomo + stdlib): identity/ caller identity seams [P0 ✓] run/ run-loop mechanics + RunnableAgent DTO + [P2 wip] nil-safe run.Ports (Audit/Budget/Critic/ - Checkpointer/PaletteSource) defined; the + Checkpointer/PaletteSource) + step + instrumentation (steps.go) done; the agentexec+skillexec -> run.Executor MERGE (consuming Ports) is the remaining P2 work [P2] dispatchguard/ loop/depth/fan-out caps [P0 ✓] @@ -55,7 +56,7 @@ CORE (majordomo + stdlib): (convar->config.Source; UsageSink/TraceSink seams; GenerateWith[T] structured output — no separate structured/ pkg) llmmeta/ shared meta-LLM helper over model/ [P1 ✓] - compact/ context compactor (WithCompactor hook) [P2] + compact/ context compactor (WithCompactor hook) [P2 ✓] tools/{web,net,store,compose,meta,comms} generic tools [P3] BATTERIES (opt-in siblings, each nil-safe + a default): diff --git a/compact/compactor.go b/compact/compactor.go new file mode 100644 index 0000000..0749ada --- /dev/null +++ b/compact/compactor.go @@ -0,0 +1,331 @@ +// V15.2 context compactor (re-based on majordomo). +// +// Why: the agent loop accumulates tool results indefinitely. A +// research-heavy run with many web_search / read_page / http_get +// results easily crosses 200K tokens and trips the model's HTTP-400 +// "prompt too long" rejection mid-run (observed at 410K tokens on +// qwen3-coder:480b which has a 262K cap). majordomo's agent loop calls +// the compactor with the full message slice before every model call; +// the compactor returns a shorter slice that preserves the system +// prompt + recent messages, with the middle range replaced by a +// synthetic summary. +// +// Strategy (unchanged from the agentkit era): +// - Keep any leading system message verbatim. (Under majordomo the +// system prompt normally travels in Request.System, not in the +// message slice, so this is defensive.) +// - Keep the last KeepRecent messages verbatim. This ensures the +// agent has fresh tool state to act on; compacting too aggressively +// would strip the in-flight context it needs to make the next +// decision. +// - Compress the middle range via a single fast-tier LLM call that +// receives the middle messages as raw text and produces a paragraph +// summary (URLs visited, key findings, file_ids created, what the +// agent is trying to accomplish). +// - Replace the middle range with one synthetic user-role message +// containing the summary. (user-role chosen because tool-result-role +// would be ambiguous without a matching tool_call_id.) +// +// What moved in the majordomo conversion: +// - The token-threshold gate lives HERE now. agentkit estimated +// tokens and only invoked the compactor past a configured +// threshold; majordomo's hook fires before every model call, so +// the threshold check (estimateTokens vs the per-run threshold the +// executor computes from the model's context limit) is the +// compactor's first step. +// - The compactor is per-run STATEFUL: majordomo does not replace +// the loop's internal transcript with the compacted slice (the +// hook shapes only what is SENT), so without memory the middle +// would be re-summarised from scratch on every step past the +// threshold. The state remembers how far the transcript has been +// folded into the running summary and folds the previous summary +// into the next one instead of re-paying for it. +// +// Failure path: any error (LLM unavailable, malformed response, etc.) +// is returned to the agent loop, which treats compactor errors as +// non-fatal and sends the original slice — if the next model call hits +// the provider's limit, the existing HTTP-400 error path takes over. + +package compact + +import ( + "context" + "errors" + "fmt" + "strings" + "sync" + + "gitea.stevedudenhoeffer.com/steve/majordomo/llm" +) + +// ModelResolver resolves a tier/spec to a usable llm.Model (and an enriched +// context for usage attribution). model.ParseModelForContext satisfies it. +type ModelResolver func(ctx context.Context, tier string) (context.Context, llm.Model, error) + +// Compactor is the per-run compaction hook handed to the agent loop +// (matches the signature agent.WithCompactor expects). +type Compactor = func(ctx context.Context, msgs []llm.Message) ([]llm.Message, error) + +// CompactionEvent describes one fired compaction so the executor can +// log it to skill_run_logs ("compaction_fired"). +type CompactionEvent struct { + // MessagesBefore/After count the messages that would have been + // sent without/with this compaction. + MessagesBefore int + MessagesAfter int + // TokensBefore/After are the estimateTokens values for the same + // two slices. + TokensBefore int + TokensAfter int +} + +// CompactorFactory mints a fresh per-run Compactor bound to a token +// threshold. onFire (nil-safe) observes every compaction that actually +// fires. A non-positive threshold yields a pass-through compactor. +type CompactorFactory func(thresholdTokens int, onFire func(CompactionEvent)) Compactor + +// CompactorConfig controls the v15.2 compactor's behaviour. Construct +// in mort.go from convars + the executor's ModelResolver. +type CompactorConfig struct { + // Models resolves a model spec (typically "fast" or a specific tier) + // to an llm.Model. Required; nil disables compaction. + Models ModelResolver + + // SummarizerTier is the model tier used for the compression LLM call. + // Production default "fast"; an admin may set "haiku" or a specific + // model spec. Empty falls back to "fast". + SummarizerTier string + + // KeepRecent is the number of trailing messages preserved verbatim. + // Default 8. Lower values compact more aggressively (the next loop + // iteration sees less recent context); higher values keep more. + KeepRecent int + + // SummaryWordCap bounds the LLM-generated summary length. Default + // 200 words ≈ 800 chars ≈ 200 tokens — small enough that the + // compaction always shrinks the slice meaningfully. + SummaryWordCap int +} + +// NewCompactor returns a CompactorFactory implementing the middle-range +// summarisation strategy. nil cfg.Models returns a factory of no-op +// compactors that always return the input unchanged (degrades to +// v15.1 behaviour). +func NewCompactor(cfg CompactorConfig) CompactorFactory { + if cfg.Models == nil { + return func(int, func(CompactionEvent)) Compactor { + return func(_ context.Context, msgs []llm.Message) ([]llm.Message, error) { + return msgs, nil + } + } + } + if cfg.SummarizerTier == "" { + cfg.SummarizerTier = "fast" + } + if cfg.KeepRecent <= 0 { + cfg.KeepRecent = 8 + } + if cfg.SummaryWordCap <= 0 { + cfg.SummaryWordCap = 200 + } + return func(threshold int, onFire func(CompactionEvent)) Compactor { + st := &compactionState{} + return func(ctx context.Context, msgs []llm.Message) ([]llm.Message, error) { + return compactIfNeeded(ctx, cfg, st, threshold, onFire, msgs) + } + } +} + +// compactionState is the per-run fold memory: msgs[:prefixEnd] +// (excluding a leading system message) are represented by summaryText +// in the rendered slice. Guarded by mu for safety although the agent +// loop invokes the hook from a single goroutine. +type compactionState struct { + mu sync.Mutex + prefixEnd int + summaryText string +} + +// compactIfNeeded is the workhorse: render the transcript with any +// existing fold applied, check the threshold, and fold more of the +// middle into the summary when the rendered size still exceeds it. +func compactIfNeeded(ctx context.Context, cfg CompactorConfig, st *compactionState, + threshold int, onFire func(CompactionEvent), msgs []llm.Message) ([]llm.Message, error) { + st.mu.Lock() + defer st.mu.Unlock() + + rendered := renderCompacted(st, msgs) + if threshold <= 0 { + return rendered, nil + } + tokensBefore := estimateTokens(rendered) + if tokensBefore < threshold { + return rendered, nil + } + + // Determine the new middle range to fold: everything between what + // is already summarised (or the optional leading system message) + // and the KeepRecent tail. + startMiddle := st.prefixEnd + if startMiddle == 0 && len(msgs) > 0 && msgs[0].Role == llm.RoleSystem { + startMiddle = 1 + } + endMiddle := len(msgs) - cfg.KeepRecent + if endMiddle <= startMiddle { + // Nothing new to fold (the tail alone exceeds the threshold). + // Return the rendered slice; the model call may still succeed. + return rendered, nil + } + middle := msgs[startMiddle:endMiddle] + + summary, err := summariseMiddle(ctx, cfg, st.summaryText, middle) + if err != nil { + // Non-fatal upstream: the agent loop sends the original slice. + return rendered, fmt.Errorf("compactor: summarise middle: %w", err) + } + st.summaryText = summary + st.prefixEnd = endMiddle + + out := renderCompacted(st, msgs) + if onFire != nil { + onFire(CompactionEvent{ + MessagesBefore: len(rendered), + MessagesAfter: len(out), + TokensBefore: tokensBefore, + TokensAfter: estimateTokens(out), + }) + } + return out, nil +} + +// renderCompacted applies the fold state to msgs: [optional system] + +// [synthetic summary] + msgs[prefixEnd:]. With no fold yet, msgs is +// returned unchanged. +func renderCompacted(st *compactionState, msgs []llm.Message) []llm.Message { + if st.prefixEnd <= 0 || st.prefixEnd > len(msgs) { + return msgs + } + tail := msgs[st.prefixEnd:] + out := make([]llm.Message, 0, len(tail)+2) + if msgs[0].Role == llm.RoleSystem { + out = append(out, msgs[0]) + } + out = append(out, llm.UserText( + "[CONTEXT COMPACTED] The earlier portion of this conversation was summarised "+ + "to fit the model's context window. Summary:\n\n"+st.summaryText+ + "\n\nResume from the recent messages below.")) + out = append(out, tail...) + return out +} + +// summariseMiddle composes a "compress this transcript" prompt and +// fires one fast-tier LLM call. prevSummary (may be empty) is the +// running summary from earlier compactions; it is folded into the new +// summary so prior context is not lost. +func summariseMiddle(ctx context.Context, cfg CompactorConfig, prevSummary string, middle []llm.Message) (string, error) { + if len(middle) == 0 { + return "", errors.New("compactor: empty middle range") + } + modelCtx, model, err := cfg.Models(ctx, cfg.SummarizerTier) + if err != nil { + return "", fmt.Errorf("compactor: resolve summarizer model %q: %w", cfg.SummarizerTier, err) + } + if model == nil { + return "", errors.New("compactor: summarizer model resolved to nil") + } + if modelCtx != nil { + ctx = modelCtx + } + + var prior string + if strings.TrimSpace(prevSummary) != "" { + prior = "AN EARLIER PORTION WAS ALREADY SUMMARISED AS:\n" + prevSummary + + "\n\nFold that summary into your new one — its facts must survive.\n\n" + } + transcript := renderTranscript(middle) + prompt := fmt.Sprintf( + "You are compressing an in-flight agent's conversation transcript so the agent "+ + "can continue working without blowing its model context. The transcript below is "+ + "a sequence of tool calls and their results. Produce a single paragraph (under %d words) "+ + "that captures:\n"+ + " - WHAT the agent has been trying to accomplish.\n"+ + " - WHICH URLs were visited / fetched (list inline, comma-separated).\n"+ + " - KEY findings or decisions (factual results the agent will need later).\n"+ + " - ANY file_ids or KV keys the agent created — these are persistent state references the agent must keep.\n"+ + " - ANY errors or dead-ends that the agent should not re-try.\n"+ + "DO NOT include verbose HTTP headers, tool-call metadata, error stack traces, or repetitive content. "+ + "DO NOT add commentary or markdown headers. Output prose only.\n\n"+ + "%sTRANSCRIPT TO COMPRESS:\n%s", + cfg.SummaryWordCap, + prior, + transcript, + ) + + resp, err := model.Generate(ctx, llm.Request{Messages: []llm.Message{llm.UserText(prompt)}}) + if err != nil { + return "", fmt.Errorf("compactor: summarise LLM call: %w", err) + } + text := strings.TrimSpace(resp.Text()) + if text == "" { + return "", errors.New("compactor: summarizer returned empty text") + } + return text, nil +} + +// estimateTokens is the chars/4 heuristic over a message slice's text, +// tool calls, and tool results. Images count a flat ~1K tokens each. +// It intentionally matches the coarse estimator the old agentkit loop +// used — the 0.7 threshold ratio provides the safety margin. +func estimateTokens(msgs []llm.Message) int { + chars := 0 + for _, m := range msgs { + for _, p := range m.Parts { + switch v := p.(type) { + case llm.TextPart: + chars += len(v.Text) + case llm.ImagePart: + chars += 4096 + } + } + for _, tc := range m.ToolCalls { + chars += len(tc.Name) + len(tc.Arguments) + } + for _, tr := range m.ToolResults { + chars += len(tr.Content) + } + } + return chars / 4 +} + +// transcriptMessageCap bounds individual message bodies at ~2KB so a +// single ultra-long tool result can't dominate the prompt sent to the +// summarizer. +const transcriptMessageCap = 2048 + +// renderTranscript flattens a message slice to a plain-text transcript +// suitable for the summarisation prompt. Tool calls show name + args, +// tool results show name + body. Empty fields are skipped. +func renderTranscript(msgs []llm.Message) string { + var sb strings.Builder + for i, m := range msgs { + fmt.Fprintf(&sb, "---\n[%d] role=%s\n", i+1, m.Role) + if text := m.Text(); text != "" { + sb.WriteString(truncate(text, transcriptMessageCap)) + sb.WriteString("\n") + } + for _, tc := range m.ToolCalls { + fmt.Fprintf(&sb, "tool_call name=%s args=%s\n", tc.Name, truncate(string(tc.Arguments), transcriptMessageCap)) + } + for _, tr := range m.ToolResults { + fmt.Fprintf(&sb, "tool_result name=%s body=%s\n", tr.Name, truncate(tr.Content, transcriptMessageCap)) + } + } + return sb.String() +} + +func truncate(s string, n int) string { + if len(s) <= n { + return s + } + return s[:n] + "...(truncated)" +} diff --git a/run/steps.go b/run/steps.go new file mode 100644 index 0000000..670e09a --- /dev/null +++ b/run/steps.go @@ -0,0 +1,407 @@ +package run + +// steps.go — the per-run step emitter and the tool→step presentation +// mapping. This is the single place that turns the executor's two loop +// chokepoints (the pre-dispatch tool hook + the post-step observer in +// executor.go) into ordered tool.Step records: one per tool call, +// each with a stable id, an open-vocabulary kind, and a human +// present-tense summary that flips running→complete/error. +// +// One source feeds two consumers (mirroring the OnEvent/OnToolEvent/ +// PostRunResult pattern): the live tool.Invocation.OnStep callback +// (nil-safe) AND snapshot(), which the executor copies onto Result.Steps. +// Because the Result accumulation does not depend on OnStep being set, +// every surface — chat (JSON + SSE), Discord, cron, sub-agents — carries +// steps; OnStep is needed only for live streaming. + +import ( + "context" + "encoding/json" + "fmt" + "net/url" + "strings" + "time" + + "gitea.stevedudenhoeffer.com/steve/majordomo/llm" + + "gitea.stevedudenhoeffer.com/steve/executus/tool" +) + +// stepSummaryMaxLen caps the human summary length (section G size cap). +// Detail is unused in v1 (no live detail source while replies are +// generated blocking) so there is no Detail cap yet. +const stepSummaryMaxLen = 200 + +// stepEmitter accumulates ordered steps for one run and fires the live +// OnStep callback. +// +// Concurrency: touched ONLY from the agent-loop goroutine. Both call +// sites (the hookToolbox `before` closure and the stepObserver) run +// there; majordomo executes a step's tool calls sequentially, and +// sub-agents build their own Invocation so they never reach this +// emitter. Same single-goroutine contract as the audit Writer and the +// critic ProgressRecorder — no internal lock. +type stepEmitter struct { + onStep func(ctx context.Context, ev tool.StepEvent) + now func() time.Time + + seq int + steps []tool.Step // ordered; the snapshot for Result.Steps + byID map[string]int // step id -> index into steps + pending map[string][]string // correlation key -> queued running ids (FIFO) +} + +// newStepEmitter returns an emitter that forwards to onStep (nil-safe). +func newStepEmitter(onStep func(ctx context.Context, ev tool.StepEvent)) *stepEmitter { + return &stepEmitter{ + onStep: onStep, + now: time.Now, + byID: map[string]int{}, + pending: map[string][]string{}, + } +} + +// corrKey correlates a "start" (name + raw args, no call id available at +// the pre-dispatch hook) with its later "end" (the stepObserver has the +// full call incl. id + the same raw args). +func corrKey(name string, args json.RawMessage) string { + return name + "\x00" + string(args) +} + +// toolStart records + emits the "start" of a tool call. Called from the +// pre-dispatch hookToolbox closure, before the tool runs. +func (e *stepEmitter) toolStart(ctx context.Context, name string, args json.RawMessage) { + if e == nil { + return + } + step := e.newStep(name, args) + key := corrKey(name, args) + e.pending[key] = append(e.pending[key], step.ID) + e.fire(ctx, "start", step) +} + +// toolEnd records + emits the terminal "end" of a tool call. Called from +// the stepObserver for each completed tool call. If no matching start was +// seen (e.g. a tool with a nil handler the pre-dispatch hook skipped), a +// start is synthesized so the step still appears. +func (e *stepEmitter) toolEnd(ctx context.Context, call llm.ToolCall, result string, isError bool) { + if e == nil { + return + } + id := e.matchPending(call.Name, call.Arguments) + if id == "" { + id = e.newStep(call.Name, call.Arguments).ID + } + idx, ok := e.byID[id] + if !ok { + return + } + step := &e.steps[idx] + end := e.now() + step.EndedAt = &end + if isError { + step.Status = "error" + } else { + step.Status = "complete" + } + if s := summaryForEnd(call.Name, call.Arguments, result, isError); s != "" { + step.Summary = s + } + e.fire(ctx, "end", *step) +} + +// newStep mints + appends a running step and returns it (by value). +func (e *stepEmitter) newStep(name string, args json.RawMessage) tool.Step { + e.seq++ + step := tool.Step{ + ID: fmt.Sprintf("s%d", e.seq), + Kind: kindForTool(name), + Title: name, + Summary: summaryForStart(name, args), + Status: "running", + StartedAt: e.now(), + } + e.byID[step.ID] = len(e.steps) + e.steps = append(e.steps, step) + return step +} + +// matchPending pops the oldest running step id for (name, args). Falls +// back to the most recent still-running step of the same tool name when +// the args don't byte-match between start and end. Returns "" on no match. +func (e *stepEmitter) matchPending(name string, args json.RawMessage) string { + key := corrKey(name, args) + if q := e.pending[key]; len(q) > 0 { + id := q[0] + if len(q) == 1 { + delete(e.pending, key) + } else { + e.pending[key] = q[1:] + } + return id + } + for i := len(e.steps) - 1; i >= 0; i-- { + if e.steps[i].Title == name && e.steps[i].Status == "running" { + return e.steps[i].ID + } + } + return "" +} + +func (e *stepEmitter) fire(ctx context.Context, phase string, step tool.Step) { + if e.onStep == nil { + return + } + e.onStep(ctx, tool.StepEvent{Phase: phase, Step: step}) +} + +// snapshot returns a copy of the ordered, deduplicated step set for the +// run Result. A step still "running" at run end (e.g. the run was +// cancelled mid-tool-call) is reported as-is. +func (e *stepEmitter) snapshot() []tool.Step { + if e == nil || len(e.steps) == 0 { + return nil + } + out := make([]tool.Step, len(e.steps)) + copy(out, e.steps) + return out +} + +// kindForTool maps a tool name to an open-vocabulary step kind. Unknown +// tools fall back to "tool" — never an error, just a generic step (the +// client maps unknown kinds to a default icon). Loosely tracks the +// catalog in pkg/skilltools/CLAUDE.md. +func kindForTool(name string) string { + switch name { + case "web_search", "search_reddit", "wikipedia_summary": + return "search" + case "read_page", "read_pdf", "read_reddit", "read_video", "verify_url", + "summary_summarise", "summarize", "file_get_text", "file_get_metadata", + "http_get", "http_post", "http_get_stream", "http_stream_read": + return "read" + case "code_exec", "calculate": + return "code" + case "file_save", "file_get", "file_list", "file_delete", "file_search": + return "file" + case "kv_get", "kv_set", "kv_list", "kv_delete", + "remember", "recall", "chatbot_get_memories": + return "memory" + case "query", "query_research", "deepresearch", "animate", + "agent_invoke", "agent_invoke_parallel", "agent_spawn", + "agent_spawn_parallel", "skill_invoke", "skill_invoke_parallel": + return "delegate" + case "think": + return "thinking" + default: + switch { + case strings.HasPrefix(name, "image") || strings.Contains(name, "draw"): + return "image" + default: + return "tool" + } + } +} + +// summaryForStart builds the human present-tense running summary. It +// derives specifics from safe arg fields only; secret-bearing tools +// (mcp_call, email_send, http_*) are summarized without echoing args. +func summaryForStart(name string, args json.RawMessage) string { + var s string + switch name { + case "web_search": + if q := argString(args, "query", "q"); q != "" { + s = fmt.Sprintf("Searching the web for %q", q) + } else { + s = "Searching the web" + } + case "search_reddit": + if q := argString(args, "query", "q"); q != "" { + s = fmt.Sprintf("Searching Reddit for %q", q) + } else { + s = "Searching Reddit" + } + case "wikipedia_summary": + if q := argString(args, "query", "title"); q != "" { + s = fmt.Sprintf("Looking up %q on Wikipedia", q) + } else { + s = "Looking up Wikipedia" + } + case "read_page", "read_pdf", "read_reddit", "read_video", "verify_url": + if u := argString(args, "url", "post", "page"); u != "" { + s = "Reading " + hostOf(u) + } else { + s = "Reading a page" + } + case "http_get", "http_post", "http_get_stream": + // Show host only — a full URL can embed credentials/tokens. + if u := argString(args, "url"); u != "" { + s = "Fetching " + hostOf(u) + } else { + s = "Making an HTTP request" + } + case "summary_summarise", "summarize": + s = "Summarizing text" + case "translate": + if lang := argString(args, "target_lang", "target_language", "lang"); lang != "" { + s = "Translating to " + lang + } else { + s = "Translating text" + } + case "code_exec": + s = "Running code" + case "calculate": + if q := argString(args, "query", "expression", "expr"); q != "" { + s = "Calculating " + truncateStep(q, 60) + } else { + s = "Calculating" + } + case "remember": + // Never echo the stored value. + s = "Saving a memory" + case "recall", "chatbot_get_memories": + s = "Recalling memories" + case "kv_get", "kv_list": + s = "Reading saved data" + case "kv_set": + s = "Saving data" + case "kv_delete": + s = "Deleting saved data" + case "file_save": + if n := argString(args, "name", "filename"); n != "" { + s = "Saving file " + truncateStep(n, 60) + } else { + s = "Saving a file" + } + case "file_get", "file_get_text", "file_get_metadata": + s = "Reading a file" + case "file_list", "file_search": + s = "Listing files" + case "query", "query_research": + if q := argString(args, "query", "question", "prompt", "task"); q != "" { + s = "Researching " + truncateStep(q, 80) + } else { + s = "Researching" + } + case "deepresearch": + s = "Running deep research" + case "animate": + s = "Generating an animation" + case "agent_invoke", "agent_spawn": + if a := argString(args, "agent", "agent_name", "name"); a != "" { + s = "Delegating to " + a + } else { + s = "Delegating to a sub-agent" + } + case "agent_invoke_parallel", "agent_spawn_parallel": + s = "Delegating to sub-agents" + case "skill_invoke": + if sk := argString(args, "skill_name", "skill", "name"); sk != "" { + s = "Running skill " + sk + } else { + s = "Running a skill" + } + case "skill_invoke_parallel": + s = "Running skills in parallel" + case "think": + s = "Thinking" + case "mcp_call": + // Redact: MCP args frequently carry secrets. Name server/tool only. + srv, tl := argString(args, "server"), argString(args, "tool") + switch { + case srv != "" && tl != "": + s = fmt.Sprintf("Calling %s/%s", srv, tl) + case srv != "": + s = "Calling " + srv + default: + s = "Calling an MCP tool" + } + case "email_send": + // Redact recipients + body. + s = "Sending an email" + default: + s = "Using " + name + } + return truncateStep(s, stepSummaryMaxLen) +} + +// summaryForEnd optionally upgrades the summary to a cheap result phrase. +// Returns "" to keep the running summary (the caller then just flips the +// status). Never returns a phrase derived from raw result bytes. +func summaryForEnd(name string, _ json.RawMessage, result string, isError bool) string { + if isError { + return "" + } + switch name { + case "web_search", "search_reddit": + if n := countResults(result); n >= 0 { + return fmt.Sprintf("Found %d result%s", n, plural(n)) + } + } + return "" +} + +// argString pulls the first present non-empty string field from a tool's +// raw JSON args, trying keys in order. Returns "" when none parse. +func argString(args json.RawMessage, keys ...string) string { + if len(args) == 0 { + return "" + } + var m map[string]any + if err := json.Unmarshal(args, &m); err != nil { + return "" + } + for _, k := range keys { + if v, ok := m[k]; ok { + if s, ok := v.(string); ok && strings.TrimSpace(s) != "" { + return strings.TrimSpace(s) + } + } + } + return "" +} + +// countResults parses a v11-style {"results":[...]} envelope and returns +// the count, or -1 when the shape doesn't match. +func countResults(result string) int { + if strings.TrimSpace(result) == "" { + return -1 + } + var env struct { + Results []json.RawMessage `json:"results"` + } + if err := json.Unmarshal([]byte(result), &env); err != nil || env.Results == nil { + return -1 + } + return len(env.Results) +} + +// hostOf returns the bare host (no leading www.) of a URL, or a short +// form of the raw string when it doesn't parse as a URL. +func hostOf(raw string) string { + if u, err := url.Parse(raw); err == nil && u.Host != "" { + return strings.TrimPrefix(u.Host, "www.") + } + return truncateStep(raw, 60) +} + +// truncateStep rune-safely caps s to max, appending an ellipsis when cut. +func truncateStep(s string, max int) string { + if max <= 0 { + return "" + } + r := []rune(s) + if len(r) <= max { + return s + } + if max == 1 { + return string(r[:1]) + } + return string(r[:max-1]) + "…" +} + +func plural(n int) string { + if n == 1 { + return "" + } + return "s" +} -- 2.52.0 From e76eed0011deffb29c3e98a7e3eb1bd21681da2f Mon Sep 17 00:00:00 2001 From: Steve Dudenhoeffer Date: Fri, 26 Jun 2026 20:45:10 -0400 Subject: [PATCH 6/8] =?UTF-8?q?P2:=20run.Executor=20=E2=80=94=20executus?= =?UTF-8?q?=20is=20runnable?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The capstone of the run kernel: run.Executor.Run(ctx, RunnableAgent, inv) ties model resolution + the tool registry + majordomo's agent loop + context compaction + run-bounding + step/audit instrumentation into one path, with every host concern behind the nil-safe run.Ports. - run/executor.go: New(Config{Registry, Models, Defaults, Ports, Compactor, ContextTokens, SystemHeader}) + Run -> Result{RunID, Output, Steps, Usage, Err}. Budget gate (pre-run), model resolve, Audit StartRun/recorder (satisfies RunTally, stamped on inv.RunState), toolbox build, step observer (zips tool calls/results -> emitter + recorder.OnStep/OnTool), V10 detached-MaxRuntime context with caller-cancel merged back, compaction wired from ContextTokens×ratio, audit Close + Budget Commit on a detached cleanup ctx. Zero Ports = a bounded in-memory run (gadfly's case). - run/executor_test.go: hermetic end-to-end run against majordomo's fake provider (hello-world), Budget-rejection (no model call), Audit-port wiring (StartRun + Close with terminal status/output). All green under -race. - examples/minimal upgraded to the real "hello, agentic world" (~15 lines: Configure tiers -> run.New -> Run -> print). README/CLAUDE.md updated. Remaining P2 follow-ups (incremental): wire Critic/Checkpointer/PaletteSource/ Delivery into the loop, multi-phase Pipelines, and the no-tools direct path. Co-Authored-By: Claude Opus 4.8 (1M context) --- CLAUDE.md | 13 +- README.md | 18 ++- examples/minimal/main.go | 48 +++++-- run/executor.go | 274 +++++++++++++++++++++++++++++++++++++++ run/executor_test.go | 132 +++++++++++++++++++ 5 files changed, 461 insertions(+), 24 deletions(-) create mode 100644 run/executor.go create mode 100644 run/executor_test.go diff --git a/CLAUDE.md b/CLAUDE.md index 06ee6ed..86a7be9 100644 --- a/CLAUDE.md +++ b/CLAUDE.md @@ -43,12 +43,13 @@ CORE (majordomo + stdlib): fanout/ programmatic N×M swarm [P0 ✓] deliver/ output egress seam (+ Discard/Stdout) [P0 ✓] identity/ caller identity seams [P0 ✓] - run/ run-loop mechanics + RunnableAgent DTO + [P2 wip] - nil-safe run.Ports (Audit/Budget/Critic/ - Checkpointer/PaletteSource) + step - instrumentation (steps.go) done; the - agentexec+skillexec -> run.Executor MERGE - (consuming Ports) is the remaining P2 work [P2] + run/ run.Executor is RUNNABLE: model-resolve + [P2 core ✓] + toolbox + majordomo loop + compaction + + run-bounding (V10 detached timeout) + step/ + audit observers + Budget gate; RunnableAgent + DTO + nil-safe run.Ports. Follow-ups: wire + Critic/Checkpointer/PaletteSource/Delivery, + Phases, and the no-tools direct path [P2] dispatchguard/ loop/depth/fan-out caps [P0 ✓] pendingattach/ attachment dedupe [P0 ✓] tool/ registry + 3-stage permissions + ssrf [P1 ✓] diff --git a/README.md b/README.md index 367d652..80b53d8 100644 --- a/README.md +++ b/README.md @@ -31,15 +31,23 @@ bot) — mort and gadfly are the first two consumers (heavy and light). See [mort]: https://gitea.stevedudenhoeffer.com/steve/mort -**Available today (P0):** +**Available today:** +- `run/` — **executus is runnable.** `run.Executor` ties model resolution, the + tool registry, majordomo's agent loop, context compaction, run-bounding, and + step/audit instrumentation into one `Run(ctx, RunnableAgent, inv) Result`, with + every host concern behind a nil-safe `run.Ports` (Audit/Budget/Critic/ + Checkpointer/PaletteSource/Delivery). See `examples/minimal`. +- `model/` — config-driven tier resolution + failover over majordomo, with + pluggable `UsageSink`/`TraceSink` and `GenerateWith[T]` structured output. +- `tool/` — the tool registry + 3-stage permission model + SSRF guard. +- `compact/` — the per-run context compactor. - `lane/` — bounded worker pool with fair-share queueing (run- and provider-concurrency). - `fanout/` — programmatic N×M swarm with bounded global + per-key concurrency. -- `config/` — the host config seam (`Source`) with an env-var default. -- `deliver/` — the output-egress seam with `Discard`/`Stdout` defaults. -- `identity/` — caller-identity seams (`AdminPolicy`, `MemberResolver`). -- `dispatchguard/`, `pendingattach/`, `run/progress.go` — run-safety primitives. +- `config/`, `deliver/`, `identity/` — host seams (config / output / identity), + each with a shipped default. +- `dispatchguard/`, `pendingattach/` — run-safety primitives. ## Design diff --git a/examples/minimal/main.go b/examples/minimal/main.go index fdc8976..f184392 100644 --- a/examples/minimal/main.go +++ b/examples/minimal/main.go @@ -1,27 +1,49 @@ -// Command minimal demonstrates executus's standalone core primitives available -// today (P0): the config seam + bounded fan-out. The full zero-config "agentic -// in ~12 lines" example arrives once the model, tool, and run packages land -// (P1–P3). +// Command minimal is executus's "hello, agentic world": wire a model resolver, +// a tool registry, and the run executor, then run an agent. With no batteries +// (Audit/Budget/Critic/Checkpointer/Palette/Delivery all nil) this is a +// bounded, in-memory run — the light-host shape (gadfly's case). +// +// Run it with a provider key for the configured tier, e.g. +// +// ANTHROPIC_API_KEY=sk-... go run ./examples/minimal +// +// Override a tier from the environment without touching code, e.g. +// +// EXECUTUS_MODEL_TIER_FAST=openai/gpt-4o-mini ANTHROPIC_API_KEY= OPENAI_KEY=sk-... go run ./examples/minimal package main import ( "context" "fmt" + "log" "gitea.stevedudenhoeffer.com/steve/executus/config" - "gitea.stevedudenhoeffer.com/steve/executus/fanout" + "gitea.stevedudenhoeffer.com/steve/executus/model" + "gitea.stevedudenhoeffer.com/steve/executus/run" + "gitea.stevedudenhoeffer.com/steve/executus/tool" ) func main() { - cfg := config.Env("EXECUTUS_") // e.g. EXECUTUS_FANOUT_MAX_CONCURRENT=8 - max := cfg.Int("fanout.max_concurrent", 4) + // 1. Configure model tiers: live values come from the environment + // (EXECUTUS_MODEL_TIER_), falling back to these defaults. + model.Configure(config.Env("EXECUTUS_"), map[string]string{ + "fast": "anthropic/claude-haiku-4-5", + "thinking": "anthropic/claude-opus-4-8", + }, 0) - items := []string{"alpha", "beta", "gamma", "delta"} - results := fanout.Run(context.Background(), items, - fanout.Options[string]{MaxConcurrent: max}, - func(_ context.Context, s string) (int, error) { return len(s), nil }) + // 2. Build the executor: a tool registry + the model resolver. No batteries. + ex := run.New(run.Config{ + Registry: tool.NewRegistry(), + Models: model.ParseModelForContext, + }) - for _, r := range results { - fmt.Printf("%-6s -> %d (err=%v)\n", items[r.Index], r.Value, r.Err) + // 3. Run an agent and print its answer. + res := ex.Run(context.Background(), + run.RunnableAgent{Name: "assistant", SystemPrompt: "You are concise.", ModelTier: "fast"}, + tool.Invocation{RunID: "demo-1", CallerID: "local"}, + "In one sentence, what is an agent harness?") + if res.Err != nil { + log.Fatalf("run failed: %v", res.Err) } + fmt.Println(res.Output) } diff --git a/run/executor.go b/run/executor.go new file mode 100644 index 0000000..c1470fb --- /dev/null +++ b/run/executor.go @@ -0,0 +1,274 @@ +package run + +import ( + "context" + "fmt" + "time" + + "gitea.stevedudenhoeffer.com/steve/majordomo/agent" + "gitea.stevedudenhoeffer.com/steve/majordomo/llm" + + "gitea.stevedudenhoeffer.com/steve/executus/compact" + "gitea.stevedudenhoeffer.com/steve/executus/tool" +) + +// ModelResolver resolves a tier alias or concrete spec to a usable llm.Model +// and an enriched context (for usage attribution). model.ParseModelForContext +// satisfies it. +type ModelResolver func(ctx context.Context, tier string) (context.Context, llm.Model, error) + +// Defaults are the executor's fallback caps and loop guards, applied per run +// when the RunnableAgent leaves a field zero. +type Defaults struct { + MaxIterations int // tool-dispatch steps; default 12 + MaxRuntime time.Duration // wall-clock per run; default 60s + FallbackTier string // tier when the agent's is empty; default "fast" + MaxConsecutiveToolErrors int // loop guard; default 3 + MaxSameToolCallRepeats int // retry-storm guard; default 3 + CompactionThresholdRatio float64 // fraction of model context to compact at; default 0.7 +} + +func (d Defaults) withFallbacks() Defaults { + if d.MaxIterations <= 0 { + d.MaxIterations = 12 + } + if d.MaxRuntime <= 0 { + d.MaxRuntime = 60 * time.Second + } + if d.FallbackTier == "" { + d.FallbackTier = "fast" + } + if d.MaxConsecutiveToolErrors <= 0 { + d.MaxConsecutiveToolErrors = 3 + } + if d.MaxSameToolCallRepeats <= 0 { + d.MaxSameToolCallRepeats = 3 + } + if d.CompactionThresholdRatio <= 0 { + d.CompactionThresholdRatio = 0.7 + } + return d +} + +// Config wires an Executor. Registry + Models are required; everything else is +// optional and nil-safe — the zero Config beyond those yields a bounded, +// in-memory run with no persistence/audit/budget/critic/delegation/compaction +// (gadfly's case). +type Config struct { + Registry tool.Registry + Models ModelResolver + Defaults Defaults + Ports Ports + + // Compactor mints the per-run context-compaction hook. nil disables + // compaction. ContextTokens resolves a tier's model context-window (for + // the compaction threshold); nil — or a zero return — also disables it. + Compactor compact.CompactorFactory + ContextTokens func(tier string) int + + // SystemHeader is an optional platform header prepended to every agent's + // system prompt. + SystemHeader string +} + +// Executor runs a RunnableAgent through majordomo's agent loop with the wired +// Ports. Construct with New; safe for concurrent use across runs. +type Executor struct { + cfg Config +} + +// New builds an Executor. It panics if Registry or Models is nil — those are +// structural, not runtime, errors. +func New(cfg Config) *Executor { + if cfg.Registry == nil || cfg.Models == nil { + panic("run.New: Registry and Models are required") + } + cfg.Defaults = cfg.Defaults.withFallbacks() + return &Executor{cfg: cfg} +} + +// Result is one run's outcome. Err carries the run failure (if any); the other +// fields are populated best-effort even on error (partial output/steps/usage). +type Result struct { + RunID string + Output string + Steps []tool.Step + Usage llm.Usage + Err error +} + +// Run executes ra with the given invocation + input and returns the Result. It +// never propagates a panic; failures surface in Result.Err. +func (e *Executor) Run(ctx context.Context, ra RunnableAgent, inv tool.Invocation, input string) Result { + started := time.Now() + res := Result{RunID: inv.RunID} + + tier := ra.ModelTier + if tier == "" { + tier = e.cfg.Defaults.FallbackTier + } + maxIter := ra.MaxIterations + if maxIter <= 0 { + maxIter = e.cfg.Defaults.MaxIterations + } + maxRuntime := ra.MaxRuntime + if maxRuntime <= 0 { + maxRuntime = e.cfg.Defaults.MaxRuntime + } + + // Budget gate (pre-run): a rejected run makes no model call. + if e.cfg.Ports.Budget != nil { + if err := e.cfg.Ports.Budget.Check(ctx, inv.CallerID); err != nil { + res.Err = err + return res + } + } + + // Resolve the model (enriches ctx for usage attribution). + modelCtx, model, err := e.cfg.Models(ctx, tier) + if err != nil { + res.Err = fmt.Errorf("resolve model %q: %w", tier, err) + return res + } + ctx = modelCtx + + // Audit start (optional). The recorder satisfies RunTally; stamp it on the + // invocation so a self-status tool can read live progress. + var rec RunRecorder + if e.cfg.Ports.Audit != nil { + rec = e.cfg.Ports.Audit.StartRun(ctx, RunInfo{ + RunID: inv.RunID, + SubjectID: ra.ID, + Name: ra.Name, + CallerID: inv.CallerID, + ChannelID: inv.ChannelID, + ParentRunID: inv.ParentRunID, + Inputs: inv.SkillInputs, + StartedAt: started, + }) + } + if rec != nil { + inv.RunState = NewRunStateAccessor(rec, maxIter, 0, started) + } + + // Build the toolbox from the agent's low-level tools. + toolbox, err := e.cfg.Registry.Build(ra.LowLevelTools, inv, tool.Visibility("private"), nil) + if err != nil { + res.Err = fmt.Errorf("build toolbox: %w", err) + e.finishAudit(ctx, rec, "error", res, started, res.Err) + return res + } + + // Step instrumentation: accumulate Result.Steps + fire inv.OnStep, and feed + // the audit recorder. majordomo's step observer hands us each completed + // iteration; we zip the model's tool calls with their executed results. + emitter := newStepEmitter(inv.OnStep) + stepObserver := func(s agent.Step) { + if rec != nil { + rec.OnStep(s.Index, s.Response) + } + var calls []llm.ToolCall + if s.Response != nil { + calls = s.Response.ToolCalls + } + for i, r := range s.Results { + var call llm.ToolCall + if i < len(calls) { + call = calls[i] + } + emitter.toolStart(ctx, call.Name, call.Arguments) + emitter.toolEnd(ctx, call, r.Content, r.IsError) + if rec != nil { + rec.OnTool(call, r.Content) + } + } + } + + // Run context: bound by MaxRuntime, detached from the caller's deadline so a + // lane/queue wait doesn't eat the run budget (mort's V10 lesson). Caller + // cancellation still propagates via MergeCancellation. + runCtx, cancel := context.WithTimeout(context.WithoutCancel(ctx), maxRuntime) + defer cancel() + runCtx, mergeCancel := MergeCancellation(runCtx, ctx) + defer mergeCancel() + + opts := []agent.Option{ + agent.WithToolbox(toolbox), + agent.WithMaxSteps(maxIter), + agent.WithToolErrorLimits(e.cfg.Defaults.MaxConsecutiveToolErrors, e.cfg.Defaults.MaxSameToolCallRepeats), + agent.WithStepObserver(stepObserver), + } + if e.cfg.Compactor != nil && e.cfg.ContextTokens != nil { + if threshold := e.compactionThreshold(tier); threshold > 0 { + opts = append(opts, agent.WithCompactor(e.cfg.Compactor(threshold, nil))) + } + } + + ag := agent.New(model, e.systemPrompt(ra), opts...) + runRes, runErr := ag.Run(runCtx, input) + + status := "ok" + if runErr != nil { + status = "error" + } + if runRes != nil { + res.Output = runRes.Output + res.Usage = runRes.Usage + } + res.Steps = emitter.snapshot() + res.Err = runErr + + e.finishAudit(ctx, rec, status, res, started, runErr) + if e.cfg.Ports.Budget != nil { + e.cfg.Ports.Budget.Commit(detach(ctx), inv.CallerID, time.Since(started).Seconds()) + } + return res +} + +// finishAudit writes the terminal roll-up on a detached context so a cancelled +// run still records (mort's CleanupContextTimeout lesson). +func (e *Executor) finishAudit(ctx context.Context, rec RunRecorder, status string, res Result, started time.Time, runErr error) { + if rec == nil { + return + } + stats := RunStats{ + Status: status, + Output: res.Output, + ToolCalls: rec.ToolCallsCount(), + RuntimeSeconds: time.Since(started).Seconds(), + } + if runErr != nil { + stats.Error = runErr.Error() + } + stats.InputTokens, stats.OutputTokens, stats.ThinkingTokens = rec.TokenStats() + rec.Close(detach(ctx), stats) +} + +func (e *Executor) systemPrompt(ra RunnableAgent) string { + if e.cfg.SystemHeader == "" { + return ra.SystemPrompt + } + if ra.SystemPrompt == "" { + return e.cfg.SystemHeader + } + return e.cfg.SystemHeader + "\n\n" + ra.SystemPrompt +} + +// compactionThreshold returns the token threshold for the tier's model context +// window (ratio × limit), or 0 when the limit is unknown. +func (e *Executor) compactionThreshold(tier string) int { + max := e.cfg.ContextTokens(tier) + if max <= 0 { + return 0 + } + return int(float64(max) * e.cfg.Defaults.CompactionThresholdRatio) +} + +// detach derives a bounded cleanup context off ctx, detached from its +// cancellation, for post-run writes. The cancel is intentionally not returned; +// CleanupContextTimeout bounds the lifetime. +func detach(ctx context.Context) context.Context { + c, cancel := context.WithTimeout(context.WithoutCancel(ctx), CleanupContextTimeout) + _ = cancel // bounded by the timeout; nothing to cancel early + return c +} diff --git a/run/executor_test.go b/run/executor_test.go new file mode 100644 index 0000000..dc4101e --- /dev/null +++ b/run/executor_test.go @@ -0,0 +1,132 @@ +package run + +import ( + "context" + "errors" + "testing" + + "gitea.stevedudenhoeffer.com/steve/majordomo/llm" + "gitea.stevedudenhoeffer.com/steve/majordomo/provider/fake" + + "gitea.stevedudenhoeffer.com/steve/executus/tool" +) + +// fakeModels returns a ModelResolver backed by a fake provider scripted to +// reply with the given text (no tool calls — the loop terminates immediately). +func fakeModels(t *testing.T, reply string) ModelResolver { + t.Helper() + fp := fake.New("fake") + fp.Enqueue("test-model", fake.Reply(reply)) + m, err := fp.Model("test-model") + if err != nil { + t.Fatalf("fake model: %v", err) + } + return func(ctx context.Context, _ string) (context.Context, llm.Model, error) { + return ctx, m, nil + } +} + +// TestExecutorRunHelloWorld is the milestone: executus runs an agent end-to-end +// against the fake provider and returns its output. Proves the kernel is +// runnable with the zero Ports (no persistence/audit/budget/critic). +func TestExecutorRunHelloWorld(t *testing.T) { + ex := New(Config{ + Registry: tool.NewRegistry(), + Models: fakeModels(t, "hello from executus"), + }) + + res := ex.Run(context.Background(), + RunnableAgent{Name: "greeter", SystemPrompt: "be brief", ModelTier: "test-model"}, + tool.Invocation{RunID: "run-1", CallerID: "caller-1"}, + "say hi") + + if res.Err != nil { + t.Fatalf("run error: %v", res.Err) + } + if res.Output != "hello from executus" { + t.Fatalf("output = %q, want %q", res.Output, "hello from executus") + } + if res.RunID != "run-1" { + t.Errorf("RunID = %q, want run-1", res.RunID) + } +} + +// TestExecutorBudgetRejection: a Budget that denies makes no model call. +func TestExecutorBudgetRejection(t *testing.T) { + denied := errors.New("over budget") + var modelCalled bool + models := func(ctx context.Context, _ string) (context.Context, llm.Model, error) { + modelCalled = true + return ctx, nil, nil + } + ex := New(Config{ + Registry: tool.NewRegistry(), + Models: models, + Ports: Ports{Budget: budgetFunc{check: func(string) error { return denied }}}, + }) + + res := ex.Run(context.Background(), + RunnableAgent{ModelTier: "test-model"}, + tool.Invocation{RunID: "r", CallerID: "broke"}, "hi") + + if !errors.Is(res.Err, denied) { + t.Fatalf("err = %v, want budget denial", res.Err) + } + if modelCalled { + t.Error("model must not be resolved/called when budget denies") + } +} + +// TestExecutorAuditWiring: the Audit port receives StartRun + Close with the +// terminal status/output. +func TestExecutorAuditWiring(t *testing.T) { + rec := &captureRecorder{} + ex := New(Config{ + Registry: tool.NewRegistry(), + Models: fakeModels(t, "done"), + Ports: Ports{Audit: auditFunc{start: func(RunInfo) RunRecorder { return rec }}}, + }) + + res := ex.Run(context.Background(), + RunnableAgent{ModelTier: "test-model"}, + tool.Invocation{RunID: "r2", CallerID: "c"}, "go") + + if res.Err != nil { + t.Fatalf("run error: %v", res.Err) + } + if !rec.closed { + t.Fatal("recorder.Close was not called") + } + if rec.stats.Status != "ok" { + t.Errorf("close status = %q, want ok", rec.stats.Status) + } + if rec.stats.Output != "done" { + t.Errorf("close output = %q, want done", rec.stats.Output) + } +} + +// --- test doubles --- + +type budgetFunc struct{ check func(callerID string) error } + +func (b budgetFunc) Check(_ context.Context, callerID string) error { return b.check(callerID) } +func (b budgetFunc) Commit(context.Context, string, float64) {} + +type auditFunc struct{ start func(RunInfo) RunRecorder } + +func (a auditFunc) StartRun(_ context.Context, info RunInfo) RunRecorder { return a.start(info) } + +type captureRecorder struct { + closed bool + stats RunStats + steps int + tools int +} + +func (r *captureRecorder) TokenStats() (in, out, thinking int64) { return 0, 0, 0 } +func (r *captureRecorder) ToolCallsCount() int { return r.tools } +func (r *captureRecorder) OnStep(int, *llm.Response) { r.steps++ } +func (r *captureRecorder) OnTool(llm.ToolCall, string) { r.tools++ } +func (r *captureRecorder) LogEvent(string, map[string]any) {} +func (r *captureRecorder) LogError(string) {} +func (r *captureRecorder) Close(_ context.Context, s RunStats) { r.closed = true; r.stats = s } -- 2.52.0 From 69c2eb5f47c46464308d90ad937c69d7d2994e79 Mon Sep 17 00:00:00 2001 From: Steve Dudenhoeffer Date: Fri, 26 Jun 2026 21:42:46 -0400 Subject: [PATCH 7/8] fix: address verified gadfly P2 findings (9 real of 18) Independently verified all 18 gadfly findings against the code (18-agent fan-out). Fixed the 9 real ones; the other 9 were false-positive / hallucinated / valid-tradeoff (no change). High: - F1 nil model: a Models resolver returning (ctx,nil,nil) flowed into the agent loop and nil-panicked. Now a clean error (Run never panics). +test. - F9 compactor data-leak: renderTranscript sent tool-call args verbatim to the summarizer (a possibly-different provider/tier); secret-bearing tool args (mcp_call/email_send/http_*/webhook_*) are now redacted, with a doc note that result bodies still flow (summary needs them). Medium/minor: - F2 compactor error path returned the folded slice, not the original msgs (contradicting the documented non-fatal contract) -> return msgs. - F3 RunStats.Status only ok/error; now timeout (DeadlineExceeded) / cancelled (Canceled) via statusFor. +test. - F4 step-zip emitted empty-name "ghost" steps when results>calls; now pairs min(calls,results) only. - F5 SetIteration was never called -> RunState.Iteration always 0; the step observer now updates it each loop. - F6 matchPending fallback was LIFO; now FIFO (matches the per-key queue). - F7 estimateTokens had no default arm (future Part kinds counted as 0); unknown parts now counted conservatively. - F8 cloud_sync silently truncated >1MiB responses -> opaque JSON error; now a clear "response exceeded N bytes" via readCapped. - F12 step observer captured the caller ctx; now the merged runCtx. - F13 compaction onFire was nil (doc claimed it logged); now wired to audit LogEvent("compaction_fired"). - F11 (no pre-dispatch hook in majordomo) documented honestly as a known limitation; F18 UsageSink doc clarified cache tokens are subsets of input. Co-Authored-By: Claude Opus 4.8 (1M context) --- compact/compactor.go | 46 ++++++++++++++++++++-- model/cloud_sync.go | 20 +++++++++- model/sink.go | 5 +++ run/executor.go | 92 ++++++++++++++++++++++++++++++++------------ run/executor_test.go | 36 +++++++++++++++++ run/steps.go | 36 +++++++++++------ 6 files changed, 193 insertions(+), 42 deletions(-) diff --git a/compact/compactor.go b/compact/compactor.go index 0749ada..91e5c24 100644 --- a/compact/compactor.go +++ b/compact/compactor.go @@ -180,8 +180,11 @@ func compactIfNeeded(ctx context.Context, cfg CompactorConfig, st *compactionSta summary, err := summariseMiddle(ctx, cfg, st.summaryText, middle) if err != nil { - // Non-fatal upstream: the agent loop sends the original slice. - return rendered, fmt.Errorf("compactor: summarise middle: %w", err) + // Non-fatal upstream: the agent loop sends the ORIGINAL slice. Return + // msgs, not `rendered` — on a second+ compaction `rendered` already + // carries a prior synthetic summary, which is not the documented + // "original slice" the loop expects on a compactor error. + return msgs, fmt.Errorf("compactor: summarise middle: %w", err) } st.summaryText = summary st.prefixEnd = endMiddle @@ -285,6 +288,14 @@ func estimateTokens(msgs []llm.Message) int { chars += len(v.Text) case llm.ImagePart: chars += 4096 + default: + // llm.Part is a sealed-but-extensible interface (future media + // kinds). Count an unknown part conservatively (like an image) + // rather than 0, so a transcript of unrecognised content can't + // silently slip under the compaction threshold and 400 the + // model. Bump this if a large new part kind lands. + _ = v + chars += 4096 } } for _, tc := range m.ToolCalls { @@ -302,9 +313,36 @@ func estimateTokens(msgs []llm.Message) int { // summarizer. const transcriptMessageCap = 2048 +// secretBearingTools name tools whose ARGUMENTS routinely carry credentials or +// message bodies (bearer tokens, API keys, recipients, request bodies). Their +// args are dropped before the transcript reaches the summarizer model — which +// may be a different provider/tier than the run model — mirroring the redaction +// run/steps.go applies to user-facing step summaries. http_* and webhook_* are +// matched by prefix below. +var secretBearingTools = map[string]bool{ + "mcp_call": true, + "email_send": true, +} + +// redactToolArgs returns a summariser-safe rendering of a tool call's args: +// "[redacted]" for known secret-bearing tools, the args verbatim otherwise. +func redactToolArgs(name, args string) string { + if secretBearingTools[name] || + strings.HasPrefix(name, "http_") || + strings.HasPrefix(name, "webhook_") { + return "[redacted]" + } + return args +} + // renderTranscript flattens a message slice to a plain-text transcript -// suitable for the summarisation prompt. Tool calls show name + args, +// suitable for the summarisation prompt. Tool calls show name + (redacted) args, // tool results show name + body. Empty fields are skipped. +// +// NOTE: tool-RESULT bodies are forwarded to the summarizer (the summary needs +// the findings). A host whose tool results may contain secrets and whose +// summarizer tier resolves to an untrusted provider should ensure that tier is +// trusted, or pre-sanitise results before they reach the agent loop. func renderTranscript(msgs []llm.Message) string { var sb strings.Builder for i, m := range msgs { @@ -314,7 +352,7 @@ func renderTranscript(msgs []llm.Message) string { sb.WriteString("\n") } for _, tc := range m.ToolCalls { - fmt.Fprintf(&sb, "tool_call name=%s args=%s\n", tc.Name, truncate(string(tc.Arguments), transcriptMessageCap)) + fmt.Fprintf(&sb, "tool_call name=%s args=%s\n", tc.Name, truncate(redactToolArgs(tc.Name, string(tc.Arguments)), transcriptMessageCap)) } for _, tr := range m.ToolResults { fmt.Fprintf(&sb, "tool_result name=%s body=%s\n", tr.Name, truncate(tr.Content, transcriptMessageCap)) diff --git a/model/cloud_sync.go b/model/cloud_sync.go index c214ca1..9abd00e 100644 --- a/model/cloud_sync.go +++ b/model/cloud_sync.go @@ -314,7 +314,7 @@ func (c *CloudOllamaLimitCache) fetchTags(ctx context.Context) ([]string, error) return nil, err } defer resp.Body.Close() - body, err := io.ReadAll(io.LimitReader(resp.Body, maxLimitCacheResponseBytes)) + body, err := readCapped(resp.Body) if err != nil { return nil, err } @@ -367,7 +367,7 @@ func (c *CloudOllamaLimitCache) fetchContextLength(ctx context.Context, modelNam return 0, err } defer resp.Body.Close() - respBody, err := io.ReadAll(io.LimitReader(resp.Body, maxLimitCacheResponseBytes)) + respBody, err := readCapped(resp.Body) if err != nil { return 0, err } @@ -456,3 +456,19 @@ func truncate(b []byte, n int) string { // (/api/tags, /api/show) so a misbehaving endpoint can't stream an unbounded // body before the 15s timeout fires. 1 MiB is far above any real response. const maxLimitCacheResponseBytes = 1 << 20 + +// readCapped reads up to maxLimitCacheResponseBytes from r and returns a clear +// error if the response EXCEEDS the cap — rather than silently truncating (as a +// bare io.LimitReader does) and letting downstream json.Unmarshal fail with an +// opaque "unexpected end of JSON input". It reads one extra byte to detect the +// overflow. +func readCapped(r io.Reader) ([]byte, error) { + body, err := io.ReadAll(io.LimitReader(r, maxLimitCacheResponseBytes+1)) + if err != nil { + return nil, err + } + if len(body) > maxLimitCacheResponseBytes { + return nil, fmt.Errorf("cloud_sync: response exceeded %d bytes", maxLimitCacheResponseBytes) + } + return body, nil +} diff --git a/model/sink.go b/model/sink.go index 47575e2..664fedb 100644 --- a/model/sink.go +++ b/model/sink.go @@ -16,6 +16,11 @@ import ( // UsageSink receives one record per successful Generate through a model parsed // by this package (ParseModelRequest / ParseModelForContext). Implement it to // meter or bill; the token detail mirrors majordomo's Response.Usage. +// +// IMPORTANT: cacheReadTokens and cacheWriteTokens are PORTIONS of inputTokens, +// not independent additive values (they let a sink price cached vs fresh input +// differently). A sink must NOT compute total = input+output+cacheRead+ +// cacheWrite — that double-counts the cached input. type UsageSink interface { Record(ctx context.Context, model string, inputTokens, outputTokens, cacheReadTokens, cacheWriteTokens int) } diff --git a/run/executor.go b/run/executor.go index c1470fb..53d6940 100644 --- a/run/executor.go +++ b/run/executor.go @@ -2,6 +2,7 @@ package run import ( "context" + "errors" "fmt" "time" @@ -130,11 +131,18 @@ func (e *Executor) Run(ctx context.Context, ra RunnableAgent, inv tool.Invocatio res.Err = fmt.Errorf("resolve model %q: %w", tier, err) return res } + if model == nil { + // A resolver returning (ctx, nil, nil) would otherwise nil-panic inside + // the agent loop; surface it as a clean error (Run never panics out). + res.Err = fmt.Errorf("resolve model %q: resolver returned a nil model", tier) + return res + } ctx = modelCtx // Audit start (optional). The recorder satisfies RunTally; stamp it on the // invocation so a self-status tool can read live progress. var rec RunRecorder + var stateAcc *RunStateAccessor if e.cfg.Ports.Audit != nil { rec = e.cfg.Ports.Audit.StartRun(ctx, RunInfo{ RunID: inv.RunID, @@ -148,7 +156,8 @@ func (e *Executor) Run(ctx context.Context, ra RunnableAgent, inv tool.Invocatio }) } if rec != nil { - inv.RunState = NewRunStateAccessor(rec, maxIter, 0, started) + stateAcc = NewRunStateAccessor(rec, maxIter, 0, started) + inv.RunState = stateAcc } // Build the toolbox from the agent's low-level tools. @@ -159,11 +168,27 @@ func (e *Executor) Run(ctx context.Context, ra RunnableAgent, inv tool.Invocatio return res } - // Step instrumentation: accumulate Result.Steps + fire inv.OnStep, and feed - // the audit recorder. majordomo's step observer hands us each completed - // iteration; we zip the model's tool calls with their executed results. + // Run context: bound by MaxRuntime, detached from the caller's deadline so a + // lane/queue wait doesn't eat the run budget (mort's V10 lesson). Caller + // cancellation still propagates via MergeCancellation. Created BEFORE the + // step observer so the observer forwards the merged run context (not a + // possibly-cancelled caller ctx) to OnStep consumers. + runCtx, cancel := context.WithTimeout(context.WithoutCancel(ctx), maxRuntime) + defer cancel() + runCtx, mergeCancel := MergeCancellation(runCtx, ctx) + defer mergeCancel() + + // Step instrumentation: accumulate Result.Steps + fire inv.OnStep, feed the + // audit recorder, and keep the live iteration counter fresh. majordomo's + // step observer hands us each completed iteration; we zip the model's tool + // calls with their executed results PAIRWISE — a result without a matching + // call (or a call without a result) is skipped rather than recorded as an + // empty-name "ghost" step. emitter := newStepEmitter(inv.OnStep) stepObserver := func(s agent.Step) { + if stateAcc != nil { + stateAcc.SetIteration(s.Index) + } if rec != nil { rec.OnStep(s.Index, s.Response) } @@ -171,27 +196,20 @@ func (e *Executor) Run(ctx context.Context, ra RunnableAgent, inv tool.Invocatio if s.Response != nil { calls = s.Response.ToolCalls } - for i, r := range s.Results { - var call llm.ToolCall - if i < len(calls) { - call = calls[i] - } - emitter.toolStart(ctx, call.Name, call.Arguments) - emitter.toolEnd(ctx, call, r.Content, r.IsError) + n := len(s.Results) + if len(calls) < n { + n = len(calls) + } + for i := 0; i < n; i++ { + call, r := calls[i], s.Results[i] + emitter.toolStart(runCtx, call.Name, call.Arguments) + emitter.toolEnd(runCtx, call, r.Content, r.IsError) if rec != nil { rec.OnTool(call, r.Content) } } } - // Run context: bound by MaxRuntime, detached from the caller's deadline so a - // lane/queue wait doesn't eat the run budget (mort's V10 lesson). Caller - // cancellation still propagates via MergeCancellation. - runCtx, cancel := context.WithTimeout(context.WithoutCancel(ctx), maxRuntime) - defer cancel() - runCtx, mergeCancel := MergeCancellation(runCtx, ctx) - defer mergeCancel() - opts := []agent.Option{ agent.WithToolbox(toolbox), agent.WithMaxSteps(maxIter), @@ -200,17 +218,27 @@ func (e *Executor) Run(ctx context.Context, ra RunnableAgent, inv tool.Invocatio } if e.cfg.Compactor != nil && e.cfg.ContextTokens != nil { if threshold := e.compactionThreshold(tier); threshold > 0 { - opts = append(opts, agent.WithCompactor(e.cfg.Compactor(threshold, nil))) + // Forward compaction events to the audit log (makes the + // CompactionEvent doc's "logged to the run trace" promise true). + var onFire func(compact.CompactionEvent) + if rec != nil { + onFire = func(ev compact.CompactionEvent) { + rec.LogEvent("compaction_fired", map[string]any{ + "messages_before": ev.MessagesBefore, + "messages_after": ev.MessagesAfter, + "tokens_before": ev.TokensBefore, + "tokens_after": ev.TokensAfter, + }) + } + } + opts = append(opts, agent.WithCompactor(e.cfg.Compactor(threshold, onFire))) } } ag := agent.New(model, e.systemPrompt(ra), opts...) runRes, runErr := ag.Run(runCtx, input) - status := "ok" - if runErr != nil { - status = "error" - } + status := statusFor(runErr) if runRes != nil { res.Output = runRes.Output res.Usage = runRes.Usage @@ -225,6 +253,22 @@ func (e *Executor) Run(ctx context.Context, ra RunnableAgent, inv tool.Invocatio return res } +// statusFor maps a run error to a RunStats.Status, distinguishing a deadline +// (timeout) and a cancellation (cancelled — caller cancel or shutdown) from a +// generic error so audit consumers can tell them apart. +func statusFor(runErr error) string { + switch { + case runErr == nil: + return "ok" + case errors.Is(runErr, context.DeadlineExceeded): + return "timeout" + case errors.Is(runErr, context.Canceled): + return "cancelled" + default: + return "error" + } +} + // finishAudit writes the terminal roll-up on a detached context so a cancelled // run still records (mort's CleanupContextTimeout lesson). func (e *Executor) finishAudit(ctx context.Context, rec RunRecorder, status string, res Result, started time.Time, runErr error) { diff --git a/run/executor_test.go b/run/executor_test.go index dc4101e..250932c 100644 --- a/run/executor_test.go +++ b/run/executor_test.go @@ -3,6 +3,7 @@ package run import ( "context" "errors" + "fmt" "testing" "gitea.stevedudenhoeffer.com/steve/majordomo/llm" @@ -130,3 +131,38 @@ func (r *captureRecorder) OnTool(llm.ToolCall, string) { r.tools++ } func (r *captureRecorder) LogEvent(string, map[string]any) {} func (r *captureRecorder) LogError(string) {} func (r *captureRecorder) Close(_ context.Context, s RunStats) { r.closed = true; r.stats = s } + +// TestExecutorNilModelNoPanic: a resolver returning (ctx, nil, nil) yields a +// clean error, not a nil-pointer panic (gadfly F1, high severity). +func TestExecutorNilModelNoPanic(t *testing.T) { + ex := New(Config{ + Registry: tool.NewRegistry(), + Models: func(ctx context.Context, _ string) (context.Context, llm.Model, error) { + return ctx, nil, nil // nil model, nil error + }, + }) + res := ex.Run(context.Background(), + RunnableAgent{ModelTier: "x"}, tool.Invocation{RunID: "r"}, "hi") + if res.Err == nil { + t.Fatal("expected an error for a nil model, got nil (would have panicked in the loop)") + } +} + +// TestStatusFor maps run errors to RunStats.Status (gadfly F3). +func TestStatusFor(t *testing.T) { + cases := []struct { + err error + want string + }{ + {nil, "ok"}, + {context.DeadlineExceeded, "timeout"}, + {context.Canceled, "cancelled"}, + {fmt.Errorf("wrapped: %w", context.DeadlineExceeded), "timeout"}, + {errors.New("boom"), "error"}, + } + for _, c := range cases { + if got := statusFor(c.err); got != c.want { + t.Errorf("statusFor(%v) = %q, want %q", c.err, got, c.want) + } + } +} diff --git a/run/steps.go b/run/steps.go index 670e09a..16d6d08 100644 --- a/run/steps.go +++ b/run/steps.go @@ -1,11 +1,10 @@ package run // steps.go — the per-run step emitter and the tool→step presentation -// mapping. This is the single place that turns the executor's two loop -// chokepoints (the pre-dispatch tool hook + the post-step observer in -// executor.go) into ordered tool.Step records: one per tool call, -// each with a stable id, an open-vocabulary kind, and a human -// present-tense summary that flips running→complete/error. +// mapping. This is the single place that turns the executor's post-step +// observer into ordered tool.Step records: one per tool call, each with a +// stable id, an open-vocabulary kind, and a human present-tense summary +// that flips running→complete/error. // // One source feeds two consumers (mirroring the OnEvent/OnToolEvent/ // PostRunResult pattern): the live tool.Invocation.OnStep callback @@ -13,6 +12,16 @@ package run // Because the Result accumulation does not depend on OnStep being set, // every surface — chat (JSON + SSE), Discord, cron, sub-agents — carries // steps; OnStep is needed only for live streaming. +// +// LIMITATION (current): majordomo exposes only a POST-step observer, so +// the executor calls toolStart+toolEnd back-to-back after each tool has +// already run. Steps are therefore recorded faithfully, but step.StartedAt +// ≈ EndedAt and the intermediate "running" phase is never observable to a +// live OnStep consumer. A pre-dispatch hook (wrapping each tool's handler +// to emit toolStart before execution, like mort's state-react decorator) +// is a follow-up that would restore real start timing + the running phase. +// The emitter already supports that two-call shape — toolStart and toolEnd +// are separate methods — so wiring it later is additive. import ( "context" @@ -35,8 +44,8 @@ const stepSummaryMaxLen = 200 // stepEmitter accumulates ordered steps for one run and fires the live // OnStep callback. // -// Concurrency: touched ONLY from the agent-loop goroutine. Both call -// sites (the hookToolbox `before` closure and the stepObserver) run +// Concurrency: touched ONLY from the agent-loop goroutine — the executor's +// stepObserver (and, once a pre-dispatch hook is wired, that hook) run // there; majordomo executes a step's tool calls sequentially, and // sub-agents build their own Invocation so they never reach this // emitter. Same single-goroutine contract as the audit Writer and the @@ -46,7 +55,7 @@ type stepEmitter struct { now func() time.Time seq int - steps []tool.Step // ordered; the snapshot for Result.Steps + steps []tool.Step // ordered; the snapshot for Result.Steps byID map[string]int // step id -> index into steps pending map[string][]string // correlation key -> queued running ids (FIFO) } @@ -126,9 +135,12 @@ func (e *stepEmitter) newStep(name string, args json.RawMessage) tool.Step { return step } -// matchPending pops the oldest running step id for (name, args). Falls -// back to the most recent still-running step of the same tool name when -// the args don't byte-match between start and end. Returns "" on no match. +// matchPending pops the oldest running step id for (name, args). Falls back to +// the OLDEST still-running step of the same tool name when the args don't +// byte-match between start and end (e.g. JSON key reordering). FIFO on the +// fallback too, consistent with the per-key queue pop above — closing the +// oldest avoids mis-correlating concurrent same-named calls. Returns "" on no +// match. func (e *stepEmitter) matchPending(name string, args json.RawMessage) string { key := corrKey(name, args) if q := e.pending[key]; len(q) > 0 { @@ -140,7 +152,7 @@ func (e *stepEmitter) matchPending(name string, args json.RawMessage) string { } return id } - for i := len(e.steps) - 1; i >= 0; i-- { + for i := 0; i < len(e.steps); i++ { if e.steps[i].Title == name && e.steps[i].Status == "running" { return e.steps[i].ID } -- 2.52.0 From 84e84f978563b74f298bb631bef4d933d5f47851 Mon Sep 17 00:00:00 2001 From: Steve Dudenhoeffer Date: Fri, 26 Jun 2026 22:01:07 -0400 Subject: [PATCH 8/8] ci(gadfly): cloud-only fleet (3 models, drop local Macs) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Measured on the P2 review: the local Macs (m1/m5) took 26–29 min with lens timeouts and found ZERO real bugs, while the two cloud models found every genuine finding in 6–12 min. Drop the Macs; add glm-5.2:cloud as a third cloud reviewer. Net: faster (~29→~12 min) and higher signal. Models: minimax-m3:cloud, deepseek-v4-flash:cloud, glm-5.2:cloud (ollama-cloud=3 concurrency). timeout-minutes 90→30. Co-Authored-By: Claude Opus 4.8 (1M context) --- .gitea/workflows/adversarial-review.yml | 36 ++++++++----------------- 1 file changed, 11 insertions(+), 25 deletions(-) diff --git a/.gitea/workflows/adversarial-review.yml b/.gitea/workflows/adversarial-review.yml index 6c5182b..9dd178e 100644 --- a/.gitea/workflows/adversarial-review.yml +++ b/.gitea/workflows/adversarial-review.yml @@ -4,8 +4,8 @@ # caches :latest, and this build is what carries foreman provider-type support) # as a specialist swarm and posts # ONE consolidated review comment as gitea-actions. Advisory only — never blocks a -# merge. This reviews executus PRs (same setup as mort: m1/m5 foreman locals + 2 -# cloud, 3-lens suite). Gadfly is a simple system — findings are advisory; double-check. +# merge. This reviews executus PRs with 3 ollama-cloud models (3-lens suite). Gadfly +# is a simple system — findings are advisory; always double-check before acting. name: Adversarial Review (Gadfly) @@ -41,35 +41,21 @@ jobs: || github.actor == 'fizi' || github.actor == 'dazed')) runs-on: ubuntu-latest - # Full fleet (2 cloud + 2 local Macs, all running concurrently) reviewing - # every PR with the 3-lens suite — the slow local lanes dominate wall time. - timeout-minutes: 90 + # 3 cloud models, all concurrent, 3-lens suite. ~12 min typical. + timeout-minutes: 30 steps: - uses: docker://gitea.stevedudenhoeffer.com/steve/gadfly:sha-6e3a83c env: GITEA_API: ${{ github.server_url }}/api/v1/repos/${{ github.repository }} GITEA_TOKEN: ${{ secrets.GITEA_TOKEN }} OLLAMA_CLOUD_API_KEY: ${{ secrets.OLLAMA_CLOUD_API_KEY }} - # Local Macs, reached through their foreman queues (native Ollama on the - # wire). Gadfly's GADFLY_ENDPOINT_* form with the "foreman" provider - # type: GADFLY_ENDPOINT_M1 registers provider "m1", _M5 registers "m5", - # each building a foreman-preset Ollama client at the given URL. Values - # (host + token) live in gitea secrets, each of the form: - # foreman|https://| - # (converted from the komodo LLM_* DSNs foreman://@). - # REQUIRES a Gadfly image built with foreman provider-type support - # (the GADFLY_ENDPOINT "foreman|..." type); on an older image the m1/m5 - # lanes error with "unknown provider foreman". The HTTPS-only LLM_* - # foreman:// DSN is the alternative that needs no image rebuild. - # NOTE: the Mac behind each foreman must still be awake/reachable; if a - # box is offline, that model's comment shows an error and the others - # still post. (Gitea secrets aren't auto-exposed — map each explicitly.) - GADFLY_ENDPOINT_M1: ${{ secrets.GADFLY_ENDPOINT_M1 }} - GADFLY_ENDPOINT_M5: ${{ secrets.GADFLY_ENDPOINT_M5 }} - # 2 cloud (parallel) + M1 Pro + M5 Max — one consolidated comment each. - GADFLY_MODELS: "minimax-m3:cloud,deepseek-v4-flash:cloud,m1/qwen3:14b,m5/qwen3.6:35b-mlx" - # cloud runs 2 at once; each Mac one at a time; all three lanes parallel. - GADFLY_PROVIDER_CONCURRENCY: "ollama-cloud=2,m1=1,m5=1" + # executus uses CLOUD MODELS ONLY. The local Macs (m1/m5) were dropped: + # on a P2-review measurement they took 26–29 min (with lens timeouts) + # and contributed ZERO real findings — the two cloud models found every + # genuine bug in 6–12 min. Cloud-only is faster AND higher-signal. + # 3 cloud models, one consolidated comment each, all run in parallel. + GADFLY_MODELS: "minimax-m3:cloud,deepseek-v4-flash:cloud,glm-5.2:cloud" + GADFLY_PROVIDER_CONCURRENCY: "ollama-cloud=3" # Default => the 3-lens suite (security, correctness, error-handling). # Set the repo var GADFLY_SPECIALISTS to override (csv / "all" / "auto"). GADFLY_SPECIALISTS: ${{ vars.GADFLY_SPECIALISTS || 'security,correctness,error-handling' }} -- 2.52.0