12 Commits

Author SHA1 Message Date
steve 0acaa8c9a5 run: guard empty text part in runAgent + drop cross-repo doc ref (gadfly #16)
executus CI / test (pull_request) Successful in 1m46s
Every reviewer flagged that runAgent appended llm.Text(input) unconditionally, so
an image-only run (blank prompt) emitted an empty TextPart — inconsistent with the
sibling runSession.AttachImages which guards it. Mirror that guard
(strings.TrimSpace(input) != ""). Also:
- copy opts before appending (variadic backing array can have spare capacity; avoid
  aliasing a caller's slice).
- reword the doc comment to drop the mort-agentexec reference (executus is a
  standalone lib; a consumer name doesn't belong in its godoc).

Tests: image+text are co-located in ONE user message; an image-only run emits no
blank TextPart.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-28 01:11:15 -04:00
steve a35c176b42 run: fold inv.Images into the initial user message (multimodal opening turn)
executus CI / test (pull_request) Successful in 46s
Adversarial Review (Gadfly) / review (pull_request) Successful in 6m5s
The executor passed only the text `input` to majordomo's agent.Run, silently
dropping inv.Images — so a multimodal run (vision: chatbot @mention, chat API)
lost its images on the executus path. majordomo's Run input arg is text-only, so
fold the images into the first user message (text + image parts) via WithHistory
and call Run with empty input, mirroring mort agentexec's multimodal seeding. The
image-less path is unchanged (prompt passes straight through).

Tests: a run with Images carries the image bytes + prompt into the first model
request; the text-only path still reaches the model.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-28 00:37:53 -04:00
steve 1cf46c9954 ci: track gadfly's v1 release tag instead of a pinned sha (#15)
executus CI / test (push) Successful in 52s
2026-06-28 04:08:30 +00:00
steve 56baac758d ci: inherit gadfly's default swarm (slim caller, re-pin @b02b11d) (#14)
executus CI / test (push) Successful in 50s
2026-06-28 02:48:25 +00:00
steve 5779035722 Merge pull request 'ci: subscribe to gadfly's reusable review workflow (cloud + Claude Code, no local)' (#13) from ci/gadfly-reusable into main
executus CI / test (push) Successful in 3m56s
2026-06-28 01:43:42 +00:00
Steve Dudenhoeffer 1a2a2364ec security: scope forwarded secrets + pin gadfly reusable to an immutable sha
executus CI / test (pull_request) Successful in 2m13s
Adversarial Review (Gadfly) / review (pull_request) Successful in 10m31s
Address the swarm's findings on this rollout:
- Replace `secrets: inherit` (which forwarded ALL repo secrets — registry/
  Komodo/Discord/DB creds the reviewer never uses) with explicit forwarding of
  only OLLAMA_CLOUD_API_KEY / CLAUDE_CODE_OAUTH_TOKEN / findings tokens.
  GITEA_TOKEN is the automatic job token (github.token in the reusable).
- Pin uses: ...@main -> @20a5c43 (immutable) so a push to gadfly can't change
  the code that runs with our forwarded secrets.

Requires gadfly's review-reusable.yml secrets contract (steve/gadfly#9, merged).

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-27 21:18:59 -04:00
Steve Dudenhoeffer c08ce47fa6 ci: subscribe to gadfly's reusable review workflow (cloud + Claude Code, no local)
executus CI / test (pull_request) Successful in 47s
Adversarial Review (Gadfly) / review (pull_request) Successful in 12m29s
Replace the full self-contained stub with a thin caller of steve/gadfly's
reusable workflow, using gadfly's own dogfood config: 6 cloud models +
the Claude Code engine (sonnet, opus, opus:max). No local Macs / foreman.
Advisory only.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-27 20:03:00 -04:00
steve 784d5d7ce4 run: PostRun detached ctx + panic-isolated Cleanup (gadfly #12)
executus CI / test (pull_request) Successful in 45s
executus CI / test (push) Successful in 1m47s
Two convergent gadfly refinements on the PostRun wiring:
- PostRun now runs on detach(ctx), not the caller's ctx — a finished/cancelled
  caller no longer aborts artifact production (3-model: glm-5.2/minimax/deepseek).
- Cleanup is panic-isolated via safeCleanup (recover+log), matching runPostRun, so
  a misbehaving teardown can't clobber an otherwise-successful run (deepseek).

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-27 18:33:41 -04:00
steve 4e179259de run: wire SessionToolFactory + PostRun artifacts + AttachImages
executus CI / test (pull_request) Successful in 1m49s
Adversarial Review (Gadfly) / review (pull_request) Successful in 5m19s
The session-tool TYPES already lived in tool/ (P4 move) but the executor never
used them. This wires them, unblocking artifact-producing host surfaces (mort's
chat API / chatbot / .skill / scaddy) to run on executus:

- run/session.go: steerMailbox (thread-safe message queue) + runSession
  (tool.AgentSession over it: AttachImages → a user-role multimodal message
  injected before the agent's next step) + runPostRun (panic-isolated hook call).
- executor: create the mailbox + set inv.AttachImages BEFORE the toolbox build;
  add inv.ExtraTools + a SessionToolFactory's per-run Tools to the toolbox; defer
  its Cleanup; merge the session mailbox with the critic's nudges into ONE
  WithSteer; after the run, call PostRun with the full transcript
  (runRes.Messages) → Result.PostRunResult (best-effort, never fails the run).
- run.Result += PostRunResult *tool.PostRunResult.
- dropped the now-dead criticBinding.steerOptions (superseded by drainSteer).

Tests: a factory whose PostRun emits an artifact from the output+transcript +
Cleanup lands on Result.PostRunResult; a factory-added tool is callable.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-27 18:13:16 -04:00
steve 82a816ae29 ci(gadfly): trim pool to the strong 6 (drop m5/qwen3.6, gemma4, gpt-oss, kimi-k2.7)
executus CI / test (push) Successful in 46s
Pool now: minimax-m3, glm-5.2, glm-5.1, deepseek-v4-pro, nemotron-3-super,
qwen3-coder:480b (all cloud, ollama-cloud=3). Removed the low-value reviewers +
the last local endpoint (m5).

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-27 18:06:36 -04:00
steve be4bbbcad5 run: fix statusFor — don't relabel a generic error / caller-cancel as timeout (gadfly #11)
executus CI / test (pull_request) Successful in 47s
executus CI / test (push) Successful in 45s
The WithCancelCause+timer rewrite made MaxRuntime surface as Canceled (not
DeadlineExceeded), so statusFor's context.Cause(DeadlineExceeded) check could
relabel (a) a genuine run error as 'timeout' and (b) a caller cancel/deadline as
'timeout' (was 'cancelled'). Convergent gadfly finding (glm-5.2 + cluster).

Fix: keep MaxRuntime as WithTimeout (its DeadlineExceeded propagates → 'timeout',
preserving own-timeout vs caller-cancel), add a NESTED WithCancelCause layer only
for the kill. statusFor consults context.Cause ONLY for ErrCriticKill; everything
else is classified by the run error itself. Tests: generic-error-not-relabeled +
caller-cancel-stays-cancelled.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-27 17:00:26 -04:00
steve 390e6cf905 run: critic parity — fuller RecordStep + cause-carrying Kill (distinct status)
executus CI / test (pull_request) Successful in 46s
Adversarial Review (Gadfly) / review (pull_request) Successful in 22m30s
Completes the run-critic seam so a host adapter (mort's agentcritic) has full
fidelity, closing the two limitations gadfly surfaced on mort #1334.

- RecordStep(iter int, resp *llm.Response): the completed step's model response
  is now passed to the critic (was index-only), so a host that records a trace
  (mort's ProgressRecorder) can show what the agent actually produced, not just
  an iteration count. The executor forwards s.Response; the battery ignores it
  (its Progress is count-based).
- CriticHandle.KillCause() error + ErrCriticKill: the executor now distinguishes
  an explicit critic KILL from a natural backstop expiry. runCtx uses a
  cause-carrying cancel (WithCancelCause + a MaxRuntime timer cancelling with
  DeadlineExceeded); the deadline-watch cancels with ErrCriticKill when
  KillCause()!=nil, else DeadlineExceeded. statusFor reads context.Cause →
  killed / timeout / cancelled are now distinct (were all "cancelled"). The
  battery sets killCause from Decision.KillReason on a Kill.

Tests: statusFor "killed" case (cause=ErrCriticKill, err=Canceled); fake handle
+ battery RecordStep/KillCause signatures. Core stays battery-free.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-27 16:35:13 -04:00
11 changed files with 521 additions and 100 deletions
+22 -60
View File
@@ -1,11 +1,8 @@
# Gadfly — agentic adversarial PR reviewer (https://gitea.stevedudenhoeffer.com/steve/gadfly).
#
# Runs the published Gadfly image (pinned to an immutable :sha- tag — act_runner
# caches :latest, and this build is what carries foreman provider-type support)
# as a specialist swarm and posts
# ONE consolidated review comment as gitea-actions. Advisory only — never blocks a
# merge. This reviews executus PRs with 3 ollama-cloud models (3-lens suite). Gadfly
# is a simple system — findings are advisory; always double-check before acting.
# Gadfly adversarial review — subscribes to steve/gadfly's reusable workflow and
# INHERITS its default swarm. This stub holds only the triggers, the actor gate,
# secret forwarding, and the allow-list; the swarm config (models, lenses,
# concurrency, timeouts) lives centrally in gadfly's review-reusable.yml so it is
# tuned in ONE place. Advisory only — never blocks a merge.
name: Adversarial Review (Gadfly)
@@ -32,61 +29,26 @@ concurrency:
jobs:
review:
# Security: only trusted users may trigger a secret-bearing run via a PR
# comment (pull_request + workflow_dispatch are already trusted). Mirrors
# GADFLY_ALLOWED_USERS, the in-container belt-and-suspenders check.
# comment (pull_request + workflow_dispatch are already trusted). Mirrors the
# allowed_users input below (the in-container belt-and-suspenders check) — both
# lists must stay in sync; a workflow if: can't read a workflow_call input.
if: >-
github.event_name != 'issue_comment'
|| (github.event.issue.pull_request
&& (github.actor == 'steve'
|| github.actor == 'fizi'
|| github.actor == 'dazed'))
runs-on: ubuntu-latest
# Full fleet: 3 cloud (lens fan-out) + M1/M5 Macs via foreman. The slow local
# lanes dominate wall time, so allow plenty of headroom.
timeout-minutes: 90
steps:
- uses: docker://gitea.stevedudenhoeffer.com/steve/gadfly:sha-d7f364d
env:
GITEA_API: ${{ github.server_url }}/api/v1/repos/${{ github.repository }}
GITEA_TOKEN: ${{ secrets.GITEA_TOKEN }}
OLLAMA_CLOUD_API_KEY: ${{ secrets.OLLAMA_CLOUD_API_KEY }}
# Local Macs, reached through their foreman queues (native Ollama on the
# wire). GADFLY_ENDPOINT_M5 registers provider "m5",
# each a foreman-preset Ollama client at the secret's URL, of the form:
# foreman|https://<foreman-host>|<token>
# Needs an image with foreman provider-type support (this one). If a Mac
# is offline that model's comment shows an error and the others still post.
# (Gitea secrets aren't auto-exposed — map each explicitly.)
GADFLY_ENDPOINT_M5: ${{ secrets.GADFLY_ENDPOINT_M5 }}
# Full fleet: 3 cloud + M1 Pro + M5 Max. The Macs are back so the
# gadfly-reports scoreboard can quantify whether they earn their keep
# (they previously took 2629 min for ZERO real findings — now measured).
# Cloud concurrency lives in the LENSES: one cloud model at a time
# (ollama-cloud=1) with its 3 lenses concurrent (LENS ollama-cloud=3) so
# its comment lands sooner; each Mac runs one model, lenses serial (its
# foreman queue serializes anyway). All three provider lanes run parallel.
GADFLY_MODELS: "minimax-m3:cloud,glm-5.2:cloud,glm-5.1:cloud,kimi-k2.7-code:cloud,deepseek-v4-pro:cloud,nemotron-3-super:cloud,gpt-oss:120b-cloud,qwen3-coder:480b-cloud,gemma4:cloud,m5/qwen3.6:35b-mlx"
GADFLY_PROVIDER_CONCURRENCY: "ollama-cloud=3,m5=1"
GADFLY_PROVIDER_LENS_CONCURRENCY: "ollama-cloud=3"
# Default => the 3-lens suite (security, correctness, error-handling).
# Set the repo var GADFLY_SPECIALISTS to override (csv / "all" / "auto").
GADFLY_SPECIALISTS: ${{ vars.GADFLY_SPECIALISTS || 'security,correctness,error-handling' }}
# Per-lens deadline + bounded steps so the slow local models stay sane.
GADFLY_TIMEOUT_SECS: "600"
GADFLY_MAX_STEPS: "14"
# Allow-list for the comment trigger (mirrors the job-level if: guard).
GADFLY_ALLOWED_USERS: "steve,fizi,dazed"
# --- findings telemetry: POST runs + findings to the gadfly-reports store ---
# Advisory & off unless GADFLY_FINDINGS_URL is set; failures only log to
# stderr and never affect the review. GADFLY_REPO / GADFLY_PR are derived
# in-container; the URL + token are user-scope secrets.
GADFLY_FINDINGS_URL: ${{ secrets.GADFLY_FINDINGS_URL }}
GADFLY_FINDINGS_TOKEN: ${{ secrets.GADFLY_FINDINGS_TOKEN }}
# --- event context (leave as-is) ---
EVENT_NAME: ${{ github.event_name }}
PR: ${{ github.event.pull_request.number || github.event.issue.number || github.event.inputs.pr_number }}
PR_BRANCH: ${{ github.head_ref }}
IS_DRAFT: ${{ github.event.pull_request.draft }}
COMMENT_BODY: ${{ github.event.comment.body }}
COMMENT_ID: ${{ github.event.comment.id }}
ACTOR: ${{ github.actor }}
# Tracks gadfly's v1 release tag — a curated pointer re-moved on each release
# (unlike @main, which moves on every push). Central swarm tuning propagates
# here automatically; the tradeoff vs a full sha pin is that v1 is mutable.
uses: steve/gadfly/.gitea/workflows/review-reusable.yml@v1
# Least privilege: forward only the review secrets (not `secrets: inherit`,
# which would expose every repo secret). GITEA_TOKEN is the automatic token.
secrets:
OLLAMA_CLOUD_API_KEY: ${{ secrets.OLLAMA_CLOUD_API_KEY }}
CLAUDE_CODE_OAUTH_TOKEN: ${{ secrets.CLAUDE_CODE_OAUTH_TOKEN }}
GADFLY_FINDINGS_URL: ${{ secrets.GADFLY_FINDINGS_URL }}
GADFLY_FINDINGS_TOKEN: ${{ secrets.GADFLY_FINDINGS_TOKEN }}
with:
# Consumer-specific allow-list; everything else is inherited.
allowed_users: "steve,fizi,dazed"
+19 -4
View File
@@ -18,6 +18,7 @@ package critic
import (
"context"
"errors"
"log/slog"
"math"
"sync"
@@ -162,12 +163,15 @@ type handle struct {
iterations int
maxSteps int // current tool-dispatch ceiling (base MaxIterations, raised by RaiseStepsBy)
lastTool string
killed bool // sticky: once an Escalator kills, no later decision un-kills it
killed bool // sticky: once an Escalator kills, no later decision un-kills it
killCause error // non-nil once killed; surfaced via KillCause for "killed" status
stopped bool
stopCh chan struct{}
}
func (h *handle) RecordStep(iter int) {
func (h *handle) RecordStep(iter int, _ *llm.Response) {
// This battery's Progress tracks iteration count + activity, not per-step
// payload, so the response is unused here; a richer Escalator could record it.
h.mu.Lock()
h.iterations = iter
h.lastActivity = h.now()
@@ -204,6 +208,12 @@ func (h *handle) MaxSteps() int {
return h.maxSteps
}
func (h *handle) KillCause() error {
h.mu.Lock()
defer h.mu.Unlock()
return h.killCause
}
func (h *handle) Stop() {
h.mu.Lock()
if !h.stopped {
@@ -266,8 +276,13 @@ func (h *handle) tick(ctx context.Context) {
}
if d.Kill {
h.killed = true
h.deadline = h.now() // immediate hard deadline → executor cancels
return // ignore any Nudge/ExtendBy paired with a Kill
reason := d.KillReason
if reason == "" {
reason = "critic killed the run"
}
h.killCause = errors.New(reason) // surfaced via KillCause → "killed" status
h.deadline = h.now() // immediate hard deadline → executor cancels
return // ignore any Nudge/ExtendBy paired with a Kill
}
if len(d.Nudge) > 0 {
h.steer = append(h.steer, d.Nudge...)
+1 -1
View File
@@ -51,7 +51,7 @@ func TestMonitorEscalatesOncePerIdlePeriodAndExtends(t *testing.T) {
t.Error("deadline should have been extended past the original")
}
// A fresh step re-arms; another idle period escalates again.
h.RecordStep(1)
h.RecordStep(1, nil)
time.Sleep(60 * time.Millisecond)
mu.Lock()
c2 := calls
+24 -14
View File
@@ -2,9 +2,11 @@ package run
import (
"context"
"fmt"
"time"
"gitea.stevedudenhoeffer.com/steve/majordomo/agent"
"gitea.stevedudenhoeffer.com/steve/majordomo/llm"
)
// criticDeadlineCheck is how often the deadline-watch goroutine polls the
@@ -21,12 +23,15 @@ type criticBinding struct {
}
// startCritic begins critic monitoring for this run when one is configured and
// the agent enables it. It launches a goroutine that cancels runCtx (via cancel)
// the moment the critic's hard deadline passes — the critic may extend that
// deadline, so a healthy-but-slow run is given room while a hung one is killed.
// Returns (nil, no-op stop) when there is no critic. The caller MUST defer the
// returned stop.
func (e *Executor) startCritic(runCtx context.Context, cancel context.CancelFunc, ra RunnableAgent, info RunInfo) (*criticBinding, func()) {
// the agent enables it. It launches a goroutine that cancels runCtx (via
// cancelCause) the moment the critic's hard deadline passes — the critic may
// extend that deadline, so a healthy-but-slow run is given room while a hung one
// is killed. When the deadline passes because the critic KILLED the run
// (KillCause() != nil), the cancellation cause is ErrCriticKill (→ status
// "killed"); when the backstop simply expired, it is context.DeadlineExceeded (→
// "timeout"). Returns (nil, no-op stop) when there is no critic. The caller MUST
// defer the returned stop.
func (e *Executor) startCritic(runCtx context.Context, cancelCause context.CancelCauseFunc, ra RunnableAgent, info RunInfo) (*criticBinding, func()) {
noop := func() {}
if e.cfg.Ports.Critic == nil || !ra.Critic.Enabled {
return nil, noop
@@ -55,9 +60,14 @@ func (e *Executor) startCritic(runCtx context.Context, cancel context.CancelFunc
return
case <-t.C:
// A zero deadline = no hard cap (not yet set); otherwise cancel
// once we're at or past it.
// once we're at or past it, distinguishing an explicit kill from a
// natural backstop expiry so the run gets the right status.
if d := h.Deadline(); !d.IsZero() && !time.Now().Before(d) {
cancel()
if cause := h.KillCause(); cause != nil {
cancelCause(fmt.Errorf("%w: %s", ErrCriticKill, cause.Error()))
} else {
cancelCause(context.DeadlineExceeded)
}
return
}
}
@@ -69,9 +79,9 @@ func (e *Executor) startCritic(runCtx context.Context, cancel context.CancelFunc
}
}
func (b *criticBinding) recordStep(iter int) {
func (b *criticBinding) recordStep(iter int, resp *llm.Response) {
if b != nil {
b.h.RecordStep(iter)
b.h.RecordStep(iter, resp)
}
}
@@ -104,11 +114,11 @@ func (b *criticBinding) maxStepsOption(base int) agent.Option {
})
}
// steerOptions returns the agent RunOptions that drain the critic's steer
// messages into the loop. Empty when there is no critic.
func (b *criticBinding) steerOptions() []agent.RunOption {
// drainSteer returns the critic's queued steer messages (nil-safe), so the
// executor can merge them with the session steer mailbox into one WithSteer.
func (b *criticBinding) drainSteer() []llm.Message {
if b == nil {
return nil
}
return []agent.RunOption{agent.WithSteer(b.h.Steer)}
return b.h.Steer()
}
+8 -2
View File
@@ -23,10 +23,16 @@ type fakeCriticHandle struct {
mu sync.Mutex
steps, tools, stops int
steered int
maxSteps int // 0 => defer to the run's base MaxIterations
maxSteps int // 0 => defer to the run's base MaxIterations
killCause error // non-nil simulates a critic kill
}
func (h *fakeCriticHandle) RecordStep(int) { h.mu.Lock(); h.steps++; h.mu.Unlock() }
func (h *fakeCriticHandle) RecordStep(int, *llm.Response) { h.mu.Lock(); h.steps++; h.mu.Unlock() }
func (h *fakeCriticHandle) KillCause() error {
h.mu.Lock()
defer h.mu.Unlock()
return h.killCause
}
func (h *fakeCriticHandle) RecordToolStart(string, string) {
h.mu.Lock()
h.tools++
+107 -10
View File
@@ -4,6 +4,7 @@ import (
"context"
"errors"
"fmt"
"strings"
"time"
"gitea.stevedudenhoeffer.com/steve/majordomo/agent"
@@ -101,6 +102,10 @@ type Result struct {
Steps []tool.Step
Usage llm.Usage
Err error
// PostRunResult carries artifacts produced by a SessionToolFactory's PostRun
// hook (rendered images, files). nil when no factory was set or PostRun
// returned nil. The host delivers these (e.g. mort's chat API / Discord).
PostRunResult *tool.PostRunResult
}
// Run executes ra with the given invocation + input and returns the Result. It
@@ -176,6 +181,12 @@ func (e *Executor) Run(ctx context.Context, ra RunnableAgent, inv tool.Invocatio
inv.RunState = stateAcc
}
// Steer mailbox: lets session tools (via inv.AttachImages) feed multimodal
// messages into the running conversation before its next step. Created BEFORE
// the toolbox build so any tool's handler captures the live AttachImages seam.
mailbox := &steerMailbox{}
inv.AttachImages = (&runSession{mailbox: mailbox}).AttachImages
// Build the toolbox from the agent's low-level tools.
toolbox, err := e.cfg.Registry.Build(ra.LowLevelTools, inv, tool.Visibility("private"), nil)
if err != nil {
@@ -192,20 +203,56 @@ func (e *Executor) Run(ctx context.Context, ra RunnableAgent, inv tool.Invocatio
return res
}
// Per-invocation ExtraTools + a SessionToolFactory's per-run tools, added on
// top of the agent's palette. The factory closes over the live session (the
// AttachImages mailbox); its PostRun hook (held for after the run) produces
// artifacts attached to res.PostRunResult, and its Cleanup is deferred. All
// nil-safe.
for _, t := range inv.ExtraTools {
if err := toolbox.Add(t); err != nil {
res.Err = fmt.Errorf("add extra tool: %w", err)
e.finishAudit(ctx, rec, "error", res, started, res.Err)
return res
}
}
var postRun func(ctx context.Context, transcript []llm.Message, output string, runErr error) *tool.PostRunResult
if inv.SessionToolFactory != nil {
st := inv.SessionToolFactory(&runSession{mailbox: mailbox})
if st.Cleanup != nil {
defer safeCleanup(st.Cleanup) // panic-isolated, like runPostRun
}
for _, t := range st.Tools {
if err := toolbox.Add(t); err != nil {
res.Err = fmt.Errorf("add session tool: %w", err)
e.finishAudit(ctx, rec, "error", res, started, res.Err)
return res
}
}
postRun = st.PostRun
}
// Run context: bound by MaxRuntime, detached from the caller's deadline so a
// lane/queue wait doesn't eat the run budget (mort's V10 lesson). Caller
// cancellation still propagates via MergeCancellation. Created BEFORE the
// step observer so the observer forwards the merged run context (not a
// possibly-cancelled caller ctx) to OnStep consumers.
runCtx, cancel := context.WithTimeout(context.WithoutCancel(ctx), maxRuntime)
defer cancel()
// MaxRuntime stays a WithTimeout so its DeadlineExceeded propagates through the
// child chain (→ "timeout"), preserving the run's-own-timeout vs caller-cancel
// distinction. A NESTED cause-carrying layer lets a critic kill surface as a
// distinct "killed" without disturbing that: only an ErrCriticKill cause is
// consulted in statusFor; a generic run error or a caller cancel is classified
// by the run error itself.
timeoutCtx, cancelTimeout := context.WithTimeout(context.WithoutCancel(ctx), maxRuntime)
defer cancelTimeout()
runCtx, cancelCause := context.WithCancelCause(timeoutCtx)
defer cancelCause(nil)
runCtx, mergeCancel := MergeCancellation(runCtx, ctx)
defer mergeCancel()
// Critic (optional): monitors the run for a stall, can nudge/extend/kill via
// its host Escalator. Its hard deadline is bound to runCtx (cancel on pass).
// nil-safe: no-op when no critic is configured or the agent doesn't enable it.
critic, stopCritic := e.startCritic(runCtx, cancel, ra, info)
critic, stopCritic := e.startCritic(runCtx, cancelCause, ra, info)
defer stopCritic()
// Step instrumentation: accumulate Result.Steps + fire inv.OnStep, feed the
@@ -222,7 +269,7 @@ func (e *Executor) Run(ctx context.Context, ra RunnableAgent, inv tool.Invocatio
if rec != nil {
rec.OnStep(s.Index, s.Response)
}
critic.recordStep(s.Index) // keep the critic's activity clock fresh
critic.recordStep(s.Index, s.Response) // keep the critic's activity clock fresh + carry the step payload
var calls []llm.ToolCall
if s.Response != nil {
calls = s.Response.ToolCalls
@@ -271,9 +318,12 @@ func (e *Executor) Run(ctx context.Context, ra RunnableAgent, inv tool.Invocatio
}
ag := agent.New(model, e.systemPrompt(ra), opts...)
runRes, runErr := ag.Run(runCtx, input, critic.steerOptions()...)
// One WithSteer drains BOTH the session mailbox (a tool's AttachImages) and
// the critic's nudges before each step.
steer := func() []llm.Message { return append(mailbox.drain(), critic.drainSteer()...) }
runRes, runErr := runAgent(runCtx, ag, input, inv.Images, agent.WithSteer(steer))
status := statusFor(runErr)
status := statusFor(runCtx, runErr)
if runRes != nil {
res.Output = runRes.Output
res.Usage = runRes.Usage
@@ -281,6 +331,19 @@ func (e *Executor) Run(ctx context.Context, ra RunnableAgent, inv tool.Invocatio
res.Steps = emitter.snapshot()
res.Err = runErr
// PostRun: hand the SessionToolFactory's hook the full transcript (populated
// even on partial results) so it can produce artifacts. Best-effort +
// panic-isolated — a PostRun failure never fails an otherwise-successful run.
if postRun != nil {
var transcript []llm.Message
if runRes != nil {
transcript = runRes.Messages
}
// Detach from the caller's ctx: a finished/cancelled caller must not abort
// artifact production (the hook owns its own bounding, per its contract).
res.PostRunResult = runPostRun(detach(ctx), postRun, transcript, res.Output, runErr)
}
e.finishAudit(ctx, rec, status, res, started, runErr)
if e.cfg.Ports.Budget != nil {
e.cfg.Ports.Budget.Commit(detach(ctx), inv.CallerID, time.Since(started).Seconds())
@@ -289,13 +352,22 @@ func (e *Executor) Run(ctx context.Context, ra RunnableAgent, inv tool.Invocatio
return res
}
// statusFor maps a run error to a RunStats.Status, distinguishing a deadline
// (timeout) and a cancellation (cancelled — caller cancel or shutdown) from a
// generic error so audit consumers can tell them apart.
func statusFor(runErr error) string {
// statusFor maps a run error to a RunStats.Status, distinguishing a critic kill
// (killed), a deadline (timeout), and a cancellation (cancelled — caller cancel
// or shutdown) from a generic error so audit consumers can tell them apart. The
// run context's cancellation cause carries the distinction (ErrCriticKill /
// DeadlineExceeded), since ctx.Err() alone only reports Canceled.
func statusFor(runCtx context.Context, runErr error) string {
switch {
case runErr == nil:
return "ok"
// Only the kill is recovered from the cancellation cause — a critic kill
// surfaces as a plain Canceled run error, so without this it'd read as
// "cancelled". Everything else is classified by the run error itself, so a
// genuine run error is never relabeled just because the context was later
// cancelled, and a caller cancel/deadline stays "cancelled" (not "timeout").
case errors.Is(context.Cause(runCtx), ErrCriticKill):
return "killed"
case errors.Is(runErr, context.DeadlineExceeded):
return "timeout"
case errors.Is(runErr, context.Canceled):
@@ -369,3 +441,28 @@ func detach(ctx context.Context) context.Context {
_ = cancel // bounded by the timeout; nothing to cancel early
return c
}
// runAgent dispatches the majordomo agent loop. majordomo's Run takes a text-only
// input arg, so when the invocation carries images they're folded into the first
// user message (text + image parts) via WithHistory and Run is called with an
// empty input — the model then sees a multimodal opening turn. The image-less path
// passes the prompt straight through.
//
// The text part is omitted when input is blank (image-only run), matching
// runSession.AttachImages so no empty TextPart is sent.
func runAgent(ctx context.Context, ag *agent.Agent, input string, images []llm.ImagePart, opts ...agent.RunOption) (*agent.Result, error) {
if len(images) == 0 {
return ag.Run(ctx, input, opts...)
}
parts := make([]llm.Part, 0, len(images)+1)
if strings.TrimSpace(input) != "" {
parts = append(parts, llm.Text(input))
}
for _, img := range images {
parts = append(parts, img)
}
// Copy opts before appending so a caller-supplied backing array is never
// mutated/aliased (the variadic slice can have spare capacity).
opts = append(opts[:len(opts):len(opts)], agent.WithHistory([]llm.Message{llm.UserParts(parts...)}))
return ag.Run(ctx, "", opts...)
}
+20 -7
View File
@@ -148,20 +148,33 @@ func TestExecutorNilModelNoPanic(t *testing.T) {
}
}
// TestStatusFor maps run errors to RunStats.Status (gadfly F3).
// TestStatusFor maps run errors + cancellation cause to RunStats.Status (gadfly F3).
func TestStatusFor(t *testing.T) {
bg := context.Background()
// A context cancelled with the critic-kill cause: ctx.Err() is Canceled, but
// context.Cause carries ErrCriticKill → "killed".
killCtx, killCancel := context.WithCancelCause(context.Background())
killCancel(fmt.Errorf("%w: hung", ErrCriticKill))
// A context cancelled with a non-kill cause must NOT relabel a genuine run
// error: a real error stays "error" even though the ctx was later cancelled.
cancelledCtx, cc := context.WithCancelCause(context.Background())
cc(context.DeadlineExceeded)
cases := []struct {
ctx context.Context
err error
want string
}{
{nil, "ok"},
{context.DeadlineExceeded, "timeout"},
{context.Canceled, "cancelled"},
{fmt.Errorf("wrapped: %w", context.DeadlineExceeded), "timeout"},
{errors.New("boom"), "error"},
{bg, nil, "ok"},
{bg, context.DeadlineExceeded, "timeout"},
{bg, context.Canceled, "cancelled"},
{bg, fmt.Errorf("wrapped: %w", context.DeadlineExceeded), "timeout"},
{bg, errors.New("boom"), "error"},
{killCtx, context.Canceled, "killed"},
{cancelledCtx, errors.New("boom"), "error"}, // generic error not relabeled by cause
{cancelledCtx, context.Canceled, "cancelled"}, // caller cancel stays cancelled, not timeout
}
for _, c := range cases {
if got := statusFor(c.err); got != c.want {
if got := statusFor(c.ctx, c.err); got != c.want {
t.Errorf("statusFor(%v) = %q, want %q", c.err, got, c.want)
}
}
+121
View File
@@ -0,0 +1,121 @@
package run_test
import (
"context"
"strings"
"testing"
"gitea.stevedudenhoeffer.com/steve/majordomo/llm"
"gitea.stevedudenhoeffer.com/steve/majordomo/provider/fake"
"gitea.stevedudenhoeffer.com/steve/executus/run"
"gitea.stevedudenhoeffer.com/steve/executus/tool"
)
// TestExecutorFoldsInitialImages: when the invocation carries Images, they're
// folded into the first user message (alongside the prompt text) instead of being
// dropped — majordomo's Run input arg is text-only, so the executor seeds the
// multimodal opening turn via history.
func TestExecutorFoldsInitialImages(t *testing.T) {
fp := fake.New("fake")
fp.Enqueue("m", fake.Reply("saw the image"))
m, _ := fp.Model("m")
img := llm.ImagePart{MIME: "image/png", Data: []byte("PNGDATA")}
inv := tool.Invocation{RunID: "r1", Images: []llm.ImagePart{img}}
ex := run.New(run.Config{
Registry: tool.NewRegistry(),
Models: func(ctx context.Context, _ string) (context.Context, llm.Model, error) { return ctx, m, nil },
})
res := ex.Run(context.Background(), run.RunnableAgent{ModelTier: "m"}, inv, "describe this")
if res.Err != nil {
t.Fatalf("run error: %v", res.Err)
}
calls := fp.Calls()
if len(calls) == 0 {
t.Fatal("no model calls recorded")
}
// The text + image must be CO-LOCATED in a single user message (not split
// across two), so the model reads them as one multimodal turn.
coLocated := false
for _, msg := range calls[0].Request.Messages {
sawImage, sawText := false, false
for _, p := range msg.Parts {
switch pp := p.(type) {
case llm.ImagePart:
if string(pp.Data) == "PNGDATA" {
sawImage = true
}
case llm.TextPart:
if strings.Contains(pp.Text, "describe this") {
sawText = true
}
}
}
if sawImage && sawText {
coLocated = true
}
}
if !coLocated {
t.Error("image + prompt text were not folded into the SAME user message")
}
}
// TestExecutorImageOnlyNoBlankText: an image-only run (blank prompt) must NOT emit
// an empty TextPart — the message carries just the image, matching
// runSession.AttachImages's guard.
func TestExecutorImageOnlyNoBlankText(t *testing.T) {
fp := fake.New("fake")
fp.Enqueue("m", fake.Reply("saw it"))
m, _ := fp.Model("m")
inv := tool.Invocation{RunID: "r3", Images: []llm.ImagePart{{MIME: "image/png", Data: []byte("IMG")}}}
ex := run.New(run.Config{
Registry: tool.NewRegistry(),
Models: func(ctx context.Context, _ string) (context.Context, llm.Model, error) { return ctx, m, nil },
})
res := ex.Run(context.Background(), run.RunnableAgent{ModelTier: "m"}, inv, " ")
if res.Err != nil {
t.Fatalf("run error: %v", res.Err)
}
for _, msg := range fp.Calls()[0].Request.Messages {
for _, p := range msg.Parts {
if tp, ok := p.(llm.TextPart); ok && strings.TrimSpace(tp.Text) == "" {
t.Error("image-only run emitted a blank TextPart")
}
}
}
}
// TestExecutorTextOnlyUnchanged: with no Images, the prompt flows through as the
// text input (regression guard that the fold path didn't break the common case).
func TestExecutorTextOnlyUnchanged(t *testing.T) {
fp := fake.New("fake")
fp.Enqueue("m", fake.Reply("ok"))
m, _ := fp.Model("m")
ex := run.New(run.Config{
Registry: tool.NewRegistry(),
Models: func(ctx context.Context, _ string) (context.Context, llm.Model, error) { return ctx, m, nil },
})
res := ex.Run(context.Background(), run.RunnableAgent{ModelTier: "m"}, tool.Invocation{RunID: "r2"}, "plain prompt")
if res.Err != nil {
t.Fatalf("run error: %v", res.Err)
}
calls := fp.Calls()
if len(calls) == 0 {
t.Fatal("no model calls recorded")
}
sawText := false
for _, msg := range calls[0].Request.Messages {
for _, p := range msg.Parts {
if tp, ok := p.(llm.TextPart); ok && strings.Contains(tp.Text, "plain prompt") {
sawText = true
}
}
}
if !sawText {
t.Error("text-only prompt did not reach the model")
}
}
+17 -2
View File
@@ -2,6 +2,7 @@ package run
import (
"context"
"errors"
"time"
"gitea.stevedudenhoeffer.com/steve/majordomo/llm"
@@ -9,6 +10,12 @@ import (
"gitea.stevedudenhoeffer.com/steve/executus/deliver"
)
// ErrCriticKill is the cancellation cause the executor stamps on a run the
// critic kills, so a critic kill surfaces as a distinct "killed" status (vs a
// backstop "timeout" or a caller "cancelled"). A host CriticHandle signals a
// kill via KillCause(); the executor wraps that reason with this sentinel.
var ErrCriticKill = errors.New("run: critic killed the run")
// Ports are the host seams the run executor consumes. Every field is nil-safe:
// a light host passes the zero Ports and gets a bounded, in-memory run with no
// persistence, audit, budget, critic, delegation, or delivery — which is
@@ -123,8 +130,10 @@ type Critic interface {
// methods (the critic battery's handle guards its state with a mutex).
type CriticHandle interface {
// RecordStep / RecordToolStart keep the critic's activity clock fresh so a
// healthy-but-slow run is not mistaken for a hang.
RecordStep(iter int)
// healthy-but-slow run is not mistaken for a hang. RecordStep also carries the
// completed step's model response (nil-safe) so the critic's Trace can show
// what the agent actually produced, not just an iteration count.
RecordStep(iter int, resp *llm.Response)
RecordToolStart(name, args string)
// Steer returns any messages the critic wants injected into the loop (a
// nudge), drained before each step — matches majordomo agent.WithSteer.
@@ -137,6 +146,12 @@ type CriticHandle interface {
// healthy-but-long run's iteration budget mid-flight. Return <= 0 to defer to
// the run's base MaxIterations.
MaxSteps() int
// KillCause returns a non-nil reason iff the critic has decided to KILL this
// run (as opposed to letting the hard-deadline backstop expire). The executor
// reads it when the deadline passes: non-nil → cancel the run with
// ErrCriticKill (status "killed"); nil → the backstop expired naturally
// (status "timeout"). Hosts that never distinguish the two may return nil.
KillCause() error
// Stop ends monitoring when the run finishes.
Stop()
}
+88
View File
@@ -0,0 +1,88 @@
package run
import (
"context"
"log/slog"
"strings"
"sync"
"gitea.stevedudenhoeffer.com/steve/majordomo/llm"
"gitea.stevedudenhoeffer.com/steve/executus/tool"
)
// runPostRun invokes a SessionToolFactory's PostRun hook with panic isolation:
// a PostRun panic (or a slow artifact build that the hook mishandles) must not
// fail an otherwise-successful run — artifacts are best-effort, the agent's text
// output is the source of truth.
func runPostRun(ctx context.Context,
hook func(context.Context, []llm.Message, string, error) *tool.PostRunResult,
transcript []llm.Message, output string, runErr error) (prr *tool.PostRunResult) {
defer func() {
if r := recover(); r != nil {
slog.Error("run: PostRun hook panicked; no artifacts produced", "panic", r)
prr = nil
}
}()
return hook(ctx, transcript, output, runErr)
}
// steerMailbox is a thread-safe queue of messages a session tool (via
// tool.Invocation.AttachImages) wants injected into the agent loop before its
// next step — the same WithSteer mechanism the critic uses for nudges, exposed
// to ordinary tools so they can show the model content (e.g. a rendered
// preview) it must SEE, not just be told about.
type steerMailbox struct {
mu sync.Mutex
msgs []llm.Message
}
func (m *steerMailbox) push(msg llm.Message) {
m.mu.Lock()
m.msgs = append(m.msgs, msg)
m.mu.Unlock()
}
// drain returns and clears the queued messages (nil when empty).
func (m *steerMailbox) drain() []llm.Message {
m.mu.Lock()
defer m.mu.Unlock()
if len(m.msgs) == 0 {
return nil
}
out := m.msgs
m.msgs = nil
return out
}
// runSession implements tool.AgentSession over a steer mailbox: AttachImages
// queues a user-role multimodal message the agent loop injects before its next
// step. Replaces legacy agentkit's Agent.AttachImages — majordomo's *agent.Agent
// is immutable mid-run, so mutation flows through the run-scoped steer mailbox.
type runSession struct{ mailbox *steerMailbox }
func (s *runSession) AttachImages(text string, images ...llm.ImagePart) {
parts := make([]llm.Part, 0, len(images)+1)
if strings.TrimSpace(text) != "" {
parts = append(parts, llm.Text(text))
}
for _, img := range images {
parts = append(parts, img)
}
if len(parts) == 0 {
return
}
s.mailbox.push(llm.UserParts(parts...))
}
// safeCleanup runs a SessionTools.Cleanup with panic isolation, so a misbehaving
// teardown (temp-dir removal, handle close) can't clobber an otherwise-successful
// run via the executor's top-level recover.
func safeCleanup(fn func()) {
defer func() {
if r := recover(); r != nil {
slog.Error("run: session Cleanup panicked", "panic", r)
}
}()
fn()
}
+94
View File
@@ -0,0 +1,94 @@
package run_test
import (
"context"
"testing"
"gitea.stevedudenhoeffer.com/steve/majordomo/llm"
"gitea.stevedudenhoeffer.com/steve/majordomo/provider/fake"
"gitea.stevedudenhoeffer.com/steve/executus/run"
"gitea.stevedudenhoeffer.com/steve/executus/tool"
)
// TestSessionToolFactoryPostRun: a SessionToolFactory's PostRun hook produces an
// artifact (from the run output + transcript) that lands on Result.PostRunResult,
// and its Cleanup is deferred.
func TestSessionToolFactoryPostRun(t *testing.T) {
fp := fake.New("fake")
fp.Enqueue("m", fake.Reply("hello artifacts"))
m, _ := fp.Model("m")
cleanupCalled := false
inv := tool.Invocation{
RunID: "r1",
SessionToolFactory: func(_ tool.AgentSession) tool.SessionTools {
return tool.SessionTools{
PostRun: func(_ context.Context, transcript []llm.Message, output string, _ error) *tool.PostRunResult {
return &tool.PostRunResult{
Artifacts: []tool.Artifact{{Name: "out.txt", MimeType: "text/plain", Data: []byte(output)}},
Metadata: map[string]any{"transcript_len": len(transcript)},
}
},
Cleanup: func() { cleanupCalled = true },
}
},
}
ex := run.New(run.Config{
Registry: tool.NewRegistry(),
Models: func(ctx context.Context, _ string) (context.Context, llm.Model, error) { return ctx, m, nil },
})
res := ex.Run(context.Background(), run.RunnableAgent{ModelTier: "m"}, inv, "go")
if res.Err != nil {
t.Fatalf("run error: %v", res.Err)
}
if res.PostRunResult == nil {
t.Fatal("Result.PostRunResult is nil — PostRun hook not invoked / not attached")
}
if n := len(res.PostRunResult.Artifacts); n != 1 {
t.Fatalf("artifacts = %d, want 1", n)
}
a := res.PostRunResult.Artifacts[0]
if a.Name != "out.txt" || string(a.Data) != "hello artifacts" {
t.Errorf("artifact = {%q, %q}", a.Name, string(a.Data))
}
if tl, _ := res.PostRunResult.Metadata["transcript_len"].(int); tl < 1 {
t.Errorf("transcript not passed to PostRun (len=%d)", tl)
}
if !cleanupCalled {
t.Error("Cleanup was not deferred/called")
}
}
// TestSessionToolFactoryAddsTool: tools the factory returns join the run's
// toolbox and are callable by the model.
func TestSessionToolFactoryAddsTool(t *testing.T) {
fp := fake.New("fake")
fp.Enqueue("m",
fake.ReplyWith(llm.Response{ToolCalls: []llm.ToolCall{{ID: "c1", Name: "render", Arguments: []byte(`{}`)}}}),
fake.Reply("rendered"),
)
m, _ := fp.Model("m")
toolCalled := false
renderTool := llm.DefineTool("render", "render a preview",
func(_ context.Context, _ struct{}) (any, error) { toolCalled = true; return "ok", nil })
inv := tool.Invocation{
RunID: "r2",
SessionToolFactory: func(_ tool.AgentSession) tool.SessionTools {
return tool.SessionTools{Tools: []llm.Tool{renderTool}}
},
}
ex := run.New(run.Config{
Registry: tool.NewRegistry(),
Models: func(ctx context.Context, _ string) (context.Context, llm.Model, error) { return ctx, m, nil },
})
res := ex.Run(context.Background(),
run.RunnableAgent{ModelTier: "m", MaxIterations: 5}, inv, "go")
if res.Err != nil {
t.Fatalf("run error: %v", res.Err)
}
if !toolCalled {
t.Error("session-factory tool was not added to the toolbox / not called")
}
}