16 Commits

Author SHA1 Message Date
steve 0acaa8c9a5 run: guard empty text part in runAgent + drop cross-repo doc ref (gadfly #16)
executus CI / test (pull_request) Successful in 1m46s
Every reviewer flagged that runAgent appended llm.Text(input) unconditionally, so
an image-only run (blank prompt) emitted an empty TextPart — inconsistent with the
sibling runSession.AttachImages which guards it. Mirror that guard
(strings.TrimSpace(input) != ""). Also:
- copy opts before appending (variadic backing array can have spare capacity; avoid
  aliasing a caller's slice).
- reword the doc comment to drop the mort-agentexec reference (executus is a
  standalone lib; a consumer name doesn't belong in its godoc).

Tests: image+text are co-located in ONE user message; an image-only run emits no
blank TextPart.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-28 01:11:15 -04:00
steve a35c176b42 run: fold inv.Images into the initial user message (multimodal opening turn)
executus CI / test (pull_request) Successful in 46s
Adversarial Review (Gadfly) / review (pull_request) Successful in 6m5s
The executor passed only the text `input` to majordomo's agent.Run, silently
dropping inv.Images — so a multimodal run (vision: chatbot @mention, chat API)
lost its images on the executus path. majordomo's Run input arg is text-only, so
fold the images into the first user message (text + image parts) via WithHistory
and call Run with empty input, mirroring mort agentexec's multimodal seeding. The
image-less path is unchanged (prompt passes straight through).

Tests: a run with Images carries the image bytes + prompt into the first model
request; the text-only path still reaches the model.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-28 00:37:53 -04:00
steve 1cf46c9954 ci: track gadfly's v1 release tag instead of a pinned sha (#15)
executus CI / test (push) Successful in 52s
2026-06-28 04:08:30 +00:00
steve 56baac758d ci: inherit gadfly's default swarm (slim caller, re-pin @b02b11d) (#14)
executus CI / test (push) Successful in 50s
2026-06-28 02:48:25 +00:00
steve 5779035722 Merge pull request 'ci: subscribe to gadfly's reusable review workflow (cloud + Claude Code, no local)' (#13) from ci/gadfly-reusable into main
executus CI / test (push) Successful in 3m56s
2026-06-28 01:43:42 +00:00
Steve Dudenhoeffer 1a2a2364ec security: scope forwarded secrets + pin gadfly reusable to an immutable sha
executus CI / test (pull_request) Successful in 2m13s
Adversarial Review (Gadfly) / review (pull_request) Successful in 10m31s
Address the swarm's findings on this rollout:
- Replace `secrets: inherit` (which forwarded ALL repo secrets — registry/
  Komodo/Discord/DB creds the reviewer never uses) with explicit forwarding of
  only OLLAMA_CLOUD_API_KEY / CLAUDE_CODE_OAUTH_TOKEN / findings tokens.
  GITEA_TOKEN is the automatic job token (github.token in the reusable).
- Pin uses: ...@main -> @20a5c43 (immutable) so a push to gadfly can't change
  the code that runs with our forwarded secrets.

Requires gadfly's review-reusable.yml secrets contract (steve/gadfly#9, merged).

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-27 21:18:59 -04:00
Steve Dudenhoeffer c08ce47fa6 ci: subscribe to gadfly's reusable review workflow (cloud + Claude Code, no local)
executus CI / test (pull_request) Successful in 47s
Adversarial Review (Gadfly) / review (pull_request) Successful in 12m29s
Replace the full self-contained stub with a thin caller of steve/gadfly's
reusable workflow, using gadfly's own dogfood config: 6 cloud models +
the Claude Code engine (sonnet, opus, opus:max). No local Macs / foreman.
Advisory only.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-27 20:03:00 -04:00
steve 784d5d7ce4 run: PostRun detached ctx + panic-isolated Cleanup (gadfly #12)
executus CI / test (pull_request) Successful in 45s
executus CI / test (push) Successful in 1m47s
Two convergent gadfly refinements on the PostRun wiring:
- PostRun now runs on detach(ctx), not the caller's ctx — a finished/cancelled
  caller no longer aborts artifact production (3-model: glm-5.2/minimax/deepseek).
- Cleanup is panic-isolated via safeCleanup (recover+log), matching runPostRun, so
  a misbehaving teardown can't clobber an otherwise-successful run (deepseek).

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-27 18:33:41 -04:00
steve 4e179259de run: wire SessionToolFactory + PostRun artifacts + AttachImages
executus CI / test (pull_request) Successful in 1m49s
Adversarial Review (Gadfly) / review (pull_request) Successful in 5m19s
The session-tool TYPES already lived in tool/ (P4 move) but the executor never
used them. This wires them, unblocking artifact-producing host surfaces (mort's
chat API / chatbot / .skill / scaddy) to run on executus:

- run/session.go: steerMailbox (thread-safe message queue) + runSession
  (tool.AgentSession over it: AttachImages → a user-role multimodal message
  injected before the agent's next step) + runPostRun (panic-isolated hook call).
- executor: create the mailbox + set inv.AttachImages BEFORE the toolbox build;
  add inv.ExtraTools + a SessionToolFactory's per-run Tools to the toolbox; defer
  its Cleanup; merge the session mailbox with the critic's nudges into ONE
  WithSteer; after the run, call PostRun with the full transcript
  (runRes.Messages) → Result.PostRunResult (best-effort, never fails the run).
- run.Result += PostRunResult *tool.PostRunResult.
- dropped the now-dead criticBinding.steerOptions (superseded by drainSteer).

Tests: a factory whose PostRun emits an artifact from the output+transcript +
Cleanup lands on Result.PostRunResult; a factory-added tool is callable.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-27 18:13:16 -04:00
steve 82a816ae29 ci(gadfly): trim pool to the strong 6 (drop m5/qwen3.6, gemma4, gpt-oss, kimi-k2.7)
executus CI / test (push) Successful in 46s
Pool now: minimax-m3, glm-5.2, glm-5.1, deepseek-v4-pro, nemotron-3-super,
qwen3-coder:480b (all cloud, ollama-cloud=3). Removed the low-value reviewers +
the last local endpoint (m5).

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-27 18:06:36 -04:00
steve be4bbbcad5 run: fix statusFor — don't relabel a generic error / caller-cancel as timeout (gadfly #11)
executus CI / test (pull_request) Successful in 47s
executus CI / test (push) Successful in 45s
The WithCancelCause+timer rewrite made MaxRuntime surface as Canceled (not
DeadlineExceeded), so statusFor's context.Cause(DeadlineExceeded) check could
relabel (a) a genuine run error as 'timeout' and (b) a caller cancel/deadline as
'timeout' (was 'cancelled'). Convergent gadfly finding (glm-5.2 + cluster).

Fix: keep MaxRuntime as WithTimeout (its DeadlineExceeded propagates → 'timeout',
preserving own-timeout vs caller-cancel), add a NESTED WithCancelCause layer only
for the kill. statusFor consults context.Cause ONLY for ErrCriticKill; everything
else is classified by the run error itself. Tests: generic-error-not-relabeled +
caller-cancel-stays-cancelled.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-27 17:00:26 -04:00
steve 390e6cf905 run: critic parity — fuller RecordStep + cause-carrying Kill (distinct status)
executus CI / test (pull_request) Successful in 46s
Adversarial Review (Gadfly) / review (pull_request) Successful in 22m30s
Completes the run-critic seam so a host adapter (mort's agentcritic) has full
fidelity, closing the two limitations gadfly surfaced on mort #1334.

- RecordStep(iter int, resp *llm.Response): the completed step's model response
  is now passed to the critic (was index-only), so a host that records a trace
  (mort's ProgressRecorder) can show what the agent actually produced, not just
  an iteration count. The executor forwards s.Response; the battery ignores it
  (its Progress is count-based).
- CriticHandle.KillCause() error + ErrCriticKill: the executor now distinguishes
  an explicit critic KILL from a natural backstop expiry. runCtx uses a
  cause-carrying cancel (WithCancelCause + a MaxRuntime timer cancelling with
  DeadlineExceeded); the deadline-watch cancels with ErrCriticKill when
  KillCause()!=nil, else DeadlineExceeded. statusFor reads context.Cause →
  killed / timeout / cancelled are now distinct (were all "cancelled"). The
  battery sets killCause from Decision.KillReason on a Kill.

Tests: statusFor "killed" case (cause=ErrCriticKill, err=Canceled); fake handle
+ battery RecordStep/KillCause signatures. Core stays battery-free.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-27 16:35:13 -04:00
steve 1a1d5e417b chore: go mod tidy (add missing go.sum entry; CI tidiness gate)
executus CI / test (pull_request) Successful in 2m8s
executus CI / test (push) Successful in 1m45s
Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-27 14:53:58 -04:00
steve f3bd43b726 ci(gadfly): drop the m1 reviewer (dead weight; keep m5)
executus CI / test (pull_request) Failing after 1m1s
m1/qwen3:14b proved consistently low-value + slowest in the pool over multiple
PRs. Removed from GADFLY_MODELS + GADFLY_PROVIDER_CONCURRENCY + its endpoint so it
never fires again. m5 retained.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-27 14:41:14 -04:00
steve 306d575c31 critic: overflow-guard maxSteps += RaiseStepsBy (gadfly 5-model convergence)
executus CI / test (pull_request) Has been cancelled
A buggy/hostile Escalator returning a huge RaiseStepsBy could wrap handle.maxSteps
negative (which the executor reads as defer-to-base). Clamp at math.MaxInt.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-27 14:38:48 -04:00
steve 4ba83ab905 run: critic can raise a run's step ceiling mid-flight (CriticHandle.MaxSteps)
executus CI / test (pull_request) Failing after 1m1s
Adversarial Review (Gadfly) / review (pull_request) Successful in 21m8s
Prerequisite for a full-fidelity mort agentcritic adapter (which adjusts a
healthy-but-long run's iteration budget, not just its deadline). executus's
CriticHandle was deadline+steer only; this adds the dynamic step ceiling above
an unchanged majordomo (which already exposes WithMaxStepsFunc).

- run.RunInfo += MaxIterations (the run's base ceiling, so a critic can raise it
  relative to the baseline).
- run.CriticHandle += MaxSteps() int — polled by the executor each step via
  agent.WithMaxStepsFunc; <=0 defers to the base. The executor uses
  WithMaxStepsFunc(critic.MaxSteps) when a critic is active, else WithMaxSteps.
- critic battery: handle.maxSteps (initialised from RunInfo.MaxIterations) +
  MaxSteps(); Decision gains RaiseStepsBy so an Escalator can raise the ceiling
  alongside ExtendBy. ExtendOnce default is unchanged (time-only).

Test: a critic returning MaxSteps=5 lets a base-MaxIterations=1 run complete two
tool-dispatch steps past the base ceiling. Core stays battery-free (run doesn't
import critic).

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-27 14:16:03 -04:00
12 changed files with 619 additions and 115 deletions
+22 -61
View File
@@ -1,11 +1,8 @@
# Gadfly — agentic adversarial PR reviewer (https://gitea.stevedudenhoeffer.com/steve/gadfly). # Gadfly adversarial review — subscribes to steve/gadfly's reusable workflow and
# # INHERITS its default swarm. This stub holds only the triggers, the actor gate,
# Runs the published Gadfly image (pinned to an immutable :sha- tag — act_runner # secret forwarding, and the allow-list; the swarm config (models, lenses,
# caches :latest, and this build is what carries foreman provider-type support) # concurrency, timeouts) lives centrally in gadfly's review-reusable.yml so it is
# as a specialist swarm and posts # tuned in ONE place. Advisory only — never blocks a merge.
# ONE consolidated review comment as gitea-actions. Advisory only — never blocks a
# merge. This reviews executus PRs with 3 ollama-cloud models (3-lens suite). Gadfly
# is a simple system — findings are advisory; always double-check before acting.
name: Adversarial Review (Gadfly) name: Adversarial Review (Gadfly)
@@ -32,62 +29,26 @@ concurrency:
jobs: jobs:
review: review:
# Security: only trusted users may trigger a secret-bearing run via a PR # Security: only trusted users may trigger a secret-bearing run via a PR
# comment (pull_request + workflow_dispatch are already trusted). Mirrors # comment (pull_request + workflow_dispatch are already trusted). Mirrors the
# GADFLY_ALLOWED_USERS, the in-container belt-and-suspenders check. # allowed_users input below (the in-container belt-and-suspenders check) — both
# lists must stay in sync; a workflow if: can't read a workflow_call input.
if: >- if: >-
github.event_name != 'issue_comment' github.event_name != 'issue_comment'
|| (github.event.issue.pull_request || (github.event.issue.pull_request
&& (github.actor == 'steve' && (github.actor == 'steve'
|| github.actor == 'fizi' || github.actor == 'fizi'
|| github.actor == 'dazed')) || github.actor == 'dazed'))
runs-on: ubuntu-latest # Tracks gadfly's v1 release tag — a curated pointer re-moved on each release
# Full fleet: 3 cloud (lens fan-out) + M1/M5 Macs via foreman. The slow local # (unlike @main, which moves on every push). Central swarm tuning propagates
# lanes dominate wall time, so allow plenty of headroom. # here automatically; the tradeoff vs a full sha pin is that v1 is mutable.
timeout-minutes: 90 uses: steve/gadfly/.gitea/workflows/review-reusable.yml@v1
steps: # Least privilege: forward only the review secrets (not `secrets: inherit`,
- uses: docker://gitea.stevedudenhoeffer.com/steve/gadfly:sha-d7f364d # which would expose every repo secret). GITEA_TOKEN is the automatic token.
env: secrets:
GITEA_API: ${{ github.server_url }}/api/v1/repos/${{ github.repository }} OLLAMA_CLOUD_API_KEY: ${{ secrets.OLLAMA_CLOUD_API_KEY }}
GITEA_TOKEN: ${{ secrets.GITEA_TOKEN }} CLAUDE_CODE_OAUTH_TOKEN: ${{ secrets.CLAUDE_CODE_OAUTH_TOKEN }}
OLLAMA_CLOUD_API_KEY: ${{ secrets.OLLAMA_CLOUD_API_KEY }} GADFLY_FINDINGS_URL: ${{ secrets.GADFLY_FINDINGS_URL }}
# Local Macs, reached through their foreman queues (native Ollama on the GADFLY_FINDINGS_TOKEN: ${{ secrets.GADFLY_FINDINGS_TOKEN }}
# wire). GADFLY_ENDPOINT_M1 registers provider "m1", _M5 registers "m5", with:
# each a foreman-preset Ollama client at the secret's URL, of the form: # Consumer-specific allow-list; everything else is inherited.
# foreman|https://<foreman-host>|<token> allowed_users: "steve,fizi,dazed"
# Needs an image with foreman provider-type support (this one). If a Mac
# is offline that model's comment shows an error and the others still post.
# (Gitea secrets aren't auto-exposed — map each explicitly.)
GADFLY_ENDPOINT_M1: ${{ secrets.GADFLY_ENDPOINT_M1 }}
GADFLY_ENDPOINT_M5: ${{ secrets.GADFLY_ENDPOINT_M5 }}
# Full fleet: 3 cloud + M1 Pro + M5 Max. The Macs are back so the
# gadfly-reports scoreboard can quantify whether they earn their keep
# (they previously took 2629 min for ZERO real findings — now measured).
# Cloud concurrency lives in the LENSES: one cloud model at a time
# (ollama-cloud=1) with its 3 lenses concurrent (LENS ollama-cloud=3) so
# its comment lands sooner; each Mac runs one model, lenses serial (its
# foreman queue serializes anyway). All three provider lanes run parallel.
GADFLY_MODELS: "minimax-m3:cloud,glm-5.2:cloud,glm-5.1:cloud,kimi-k2.7-code:cloud,deepseek-v4-pro:cloud,nemotron-3-super:cloud,gpt-oss:120b-cloud,qwen3-coder:480b-cloud,gemma4:cloud,m1/qwen3:14b,m5/qwen3.6:35b-mlx"
GADFLY_PROVIDER_CONCURRENCY: "ollama-cloud=3,m1=1,m5=1"
GADFLY_PROVIDER_LENS_CONCURRENCY: "ollama-cloud=3"
# Default => the 3-lens suite (security, correctness, error-handling).
# Set the repo var GADFLY_SPECIALISTS to override (csv / "all" / "auto").
GADFLY_SPECIALISTS: ${{ vars.GADFLY_SPECIALISTS || 'security,correctness,error-handling' }}
# Per-lens deadline + bounded steps so the slow local models stay sane.
GADFLY_TIMEOUT_SECS: "600"
GADFLY_MAX_STEPS: "14"
# Allow-list for the comment trigger (mirrors the job-level if: guard).
GADFLY_ALLOWED_USERS: "steve,fizi,dazed"
# --- findings telemetry: POST runs + findings to the gadfly-reports store ---
# Advisory & off unless GADFLY_FINDINGS_URL is set; failures only log to
# stderr and never affect the review. GADFLY_REPO / GADFLY_PR are derived
# in-container; the URL + token are user-scope secrets.
GADFLY_FINDINGS_URL: ${{ secrets.GADFLY_FINDINGS_URL }}
GADFLY_FINDINGS_TOKEN: ${{ secrets.GADFLY_FINDINGS_TOKEN }}
# --- event context (leave as-is) ---
EVENT_NAME: ${{ github.event_name }}
PR: ${{ github.event.pull_request.number || github.event.issue.number || github.event.inputs.pr_number }}
PR_BRANCH: ${{ github.head_ref }}
IS_DRAFT: ${{ github.event.pull_request.draft }}
COMMENT_BODY: ${{ github.event.comment.body }}
COMMENT_ID: ${{ github.event.comment.id }}
ACTOR: ${{ github.actor }}
+46 -10
View File
@@ -10,13 +10,17 @@
// Mort plugs its LLM critic-agent in as an Escalator; ExtendOnce is the // Mort plugs its LLM critic-agent in as an Escalator; ExtendOnce is the
// zero-dependency default. // zero-dependency default.
// //
// NOTE: the executor's call into run.Ports.Critic is a P2 follow-up; this // The executor wires run.Ports.Critic (C0b): it feeds the handle activity,
// battery provides the seam + impl ahead of that wiring. // binds the run context to its extendable Deadline, drains its Steer, and polls
// MaxSteps each step so an Escalator can also raise a long run's step ceiling
// (Decision.RaiseStepsBy).
package critic package critic
import ( import (
"context" "context"
"errors"
"log/slog" "log/slog"
"math"
"sync" "sync"
"time" "time"
@@ -36,10 +40,11 @@ type Progress struct {
// Decision is the Escalator's verdict for a stalled run. Zero value = do // Decision is the Escalator's verdict for a stalled run. Zero value = do
// nothing (let the hard backstop eventually kill a truly hung run). // nothing (let the hard backstop eventually kill a truly hung run).
type Decision struct { type Decision struct {
Nudge []llm.Message // injected before the agent's next turn (a steer) Nudge []llm.Message // injected before the agent's next turn (a steer)
ExtendBy time.Duration // push the hard deadline out by this much ExtendBy time.Duration // push the hard deadline out by this much
Kill bool // cancel the run now RaiseStepsBy int // raise the run's tool-dispatch step ceiling by this
KillReason string Kill bool // cancel the run now
KillReason string
} }
// Escalator decides what to do when a run crosses its soft timeout. It is // Escalator decides what to do when a run crosses its soft timeout. It is
@@ -136,6 +141,7 @@ func (s *System) Monitor(ctx context.Context, info run.RunInfo, softTimeout time
now: s.now, now: s.now,
lastActivity: now, lastActivity: now,
deadline: now.Add(time.Duration(float64(softTimeout) * s.backstopMul)), deadline: now.Add(time.Duration(float64(softTimeout) * s.backstopMul)),
maxSteps: info.MaxIterations, // base ceiling; an Escalator may RaiseStepsBy
stopCh: make(chan struct{}), stopCh: make(chan struct{}),
} }
go h.watch(ctx, check) go h.watch(ctx, check)
@@ -155,13 +161,17 @@ type handle struct {
deadline time.Time deadline time.Time
steer []llm.Message steer []llm.Message
iterations int iterations int
maxSteps int // current tool-dispatch ceiling (base MaxIterations, raised by RaiseStepsBy)
lastTool string lastTool string
killed bool // sticky: once an Escalator kills, no later decision un-kills it killed bool // sticky: once an Escalator kills, no later decision un-kills it
killCause error // non-nil once killed; surfaced via KillCause for "killed" status
stopped bool stopped bool
stopCh chan struct{} stopCh chan struct{}
} }
func (h *handle) RecordStep(iter int) { func (h *handle) RecordStep(iter int, _ *llm.Response) {
// This battery's Progress tracks iteration count + activity, not per-step
// payload, so the response is unused here; a richer Escalator could record it.
h.mu.Lock() h.mu.Lock()
h.iterations = iter h.iterations = iter
h.lastActivity = h.now() h.lastActivity = h.now()
@@ -192,6 +202,18 @@ func (h *handle) Deadline() time.Time {
return h.deadline return h.deadline
} }
func (h *handle) MaxSteps() int {
h.mu.Lock()
defer h.mu.Unlock()
return h.maxSteps
}
func (h *handle) KillCause() error {
h.mu.Lock()
defer h.mu.Unlock()
return h.killCause
}
func (h *handle) Stop() { func (h *handle) Stop() {
h.mu.Lock() h.mu.Lock()
if !h.stopped { if !h.stopped {
@@ -254,8 +276,13 @@ func (h *handle) tick(ctx context.Context) {
} }
if d.Kill { if d.Kill {
h.killed = true h.killed = true
h.deadline = h.now() // immediate hard deadline → executor cancels reason := d.KillReason
return // ignore any Nudge/ExtendBy paired with a Kill if reason == "" {
reason = "critic killed the run"
}
h.killCause = errors.New(reason) // surfaced via KillCause → "killed" status
h.deadline = h.now() // immediate hard deadline → executor cancels
return // ignore any Nudge/ExtendBy paired with a Kill
} }
if len(d.Nudge) > 0 { if len(d.Nudge) > 0 {
h.steer = append(h.steer, d.Nudge...) h.steer = append(h.steer, d.Nudge...)
@@ -263,4 +290,13 @@ func (h *handle) tick(ctx context.Context) {
if d.ExtendBy > 0 { if d.ExtendBy > 0 {
h.deadline = h.deadline.Add(d.ExtendBy) h.deadline = h.deadline.Add(d.ExtendBy)
} }
if d.RaiseStepsBy > 0 {
// Overflow-safe: a buggy Escalator returning a huge delta must not wrap
// maxSteps negative (which the executor would read as "defer to base").
if d.RaiseStepsBy > math.MaxInt-h.maxSteps {
h.maxSteps = math.MaxInt
} else {
h.maxSteps += d.RaiseStepsBy
}
}
} }
+1 -1
View File
@@ -51,7 +51,7 @@ func TestMonitorEscalatesOncePerIdlePeriodAndExtends(t *testing.T) {
t.Error("deadline should have been extended past the original") t.Error("deadline should have been extended past the original")
} }
// A fresh step re-arms; another idle period escalates again. // A fresh step re-arms; another idle period escalates again.
h.RecordStep(1) h.RecordStep(1, nil)
time.Sleep(60 * time.Millisecond) time.Sleep(60 * time.Millisecond)
mu.Lock() mu.Lock()
c2 := calls c2 := calls
+1
View File
@@ -125,6 +125,7 @@ google.golang.org/protobuf v1.23.1-0.20200526195155-81db48ad09cc/go.mod h1:EGpAD
google.golang.org/protobuf v1.25.0/go.mod h1:9JNX74DMeImyA3h4bdi1ymwjUzf21/xIlbajtzgsN7c= google.golang.org/protobuf v1.25.0/go.mod h1:9JNX74DMeImyA3h4bdi1ymwjUzf21/xIlbajtzgsN7c=
google.golang.org/protobuf v1.34.2 h1:6xV6lTsCfpGD21XK49h7MhtcApnLqkfYgPcdHftf6hg= google.golang.org/protobuf v1.34.2 h1:6xV6lTsCfpGD21XK49h7MhtcApnLqkfYgPcdHftf6hg=
google.golang.org/protobuf v1.34.2/go.mod h1:qYOHts0dSfpeUzUFpOMr/WGzszTmLH+DiWniOlNbLDw= google.golang.org/protobuf v1.34.2/go.mod h1:qYOHts0dSfpeUzUFpOMr/WGzszTmLH+DiWniOlNbLDw=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
+40 -14
View File
@@ -2,9 +2,11 @@ package run
import ( import (
"context" "context"
"fmt"
"time" "time"
"gitea.stevedudenhoeffer.com/steve/majordomo/agent" "gitea.stevedudenhoeffer.com/steve/majordomo/agent"
"gitea.stevedudenhoeffer.com/steve/majordomo/llm"
) )
// criticDeadlineCheck is how often the deadline-watch goroutine polls the // criticDeadlineCheck is how often the deadline-watch goroutine polls the
@@ -21,12 +23,15 @@ type criticBinding struct {
} }
// startCritic begins critic monitoring for this run when one is configured and // startCritic begins critic monitoring for this run when one is configured and
// the agent enables it. It launches a goroutine that cancels runCtx (via cancel) // the agent enables it. It launches a goroutine that cancels runCtx (via
// the moment the critic's hard deadline passes — the critic may extend that // cancelCause) the moment the critic's hard deadline passes — the critic may
// deadline, so a healthy-but-slow run is given room while a hung one is killed. // extend that deadline, so a healthy-but-slow run is given room while a hung one
// Returns (nil, no-op stop) when there is no critic. The caller MUST defer the // is killed. When the deadline passes because the critic KILLED the run
// returned stop. // (KillCause() != nil), the cancellation cause is ErrCriticKill (→ status
func (e *Executor) startCritic(runCtx context.Context, cancel context.CancelFunc, ra RunnableAgent, info RunInfo) (*criticBinding, func()) { // "killed"); when the backstop simply expired, it is context.DeadlineExceeded (→
// "timeout"). Returns (nil, no-op stop) when there is no critic. The caller MUST
// defer the returned stop.
func (e *Executor) startCritic(runCtx context.Context, cancelCause context.CancelCauseFunc, ra RunnableAgent, info RunInfo) (*criticBinding, func()) {
noop := func() {} noop := func() {}
if e.cfg.Ports.Critic == nil || !ra.Critic.Enabled { if e.cfg.Ports.Critic == nil || !ra.Critic.Enabled {
return nil, noop return nil, noop
@@ -55,9 +60,14 @@ func (e *Executor) startCritic(runCtx context.Context, cancel context.CancelFunc
return return
case <-t.C: case <-t.C:
// A zero deadline = no hard cap (not yet set); otherwise cancel // A zero deadline = no hard cap (not yet set); otherwise cancel
// once we're at or past it. // once we're at or past it, distinguishing an explicit kill from a
// natural backstop expiry so the run gets the right status.
if d := h.Deadline(); !d.IsZero() && !time.Now().Before(d) { if d := h.Deadline(); !d.IsZero() && !time.Now().Before(d) {
cancel() if cause := h.KillCause(); cause != nil {
cancelCause(fmt.Errorf("%w: %s", ErrCriticKill, cause.Error()))
} else {
cancelCause(context.DeadlineExceeded)
}
return return
} }
} }
@@ -69,9 +79,9 @@ func (e *Executor) startCritic(runCtx context.Context, cancel context.CancelFunc
} }
} }
func (b *criticBinding) recordStep(iter int) { func (b *criticBinding) recordStep(iter int, resp *llm.Response) {
if b != nil { if b != nil {
b.h.RecordStep(iter) b.h.RecordStep(iter, resp)
} }
} }
@@ -88,11 +98,27 @@ func (b *criticBinding) recordToolStart(name, args string) {
} }
} }
// steerOptions returns the agent RunOptions that drain the critic's steer // maxStepsOption returns the agent step-ceiling Option. With no critic it's a
// messages into the loop. Empty when there is no critic. // fixed WithMaxSteps(base); with a critic it's a DYNAMIC WithMaxStepsFunc that
func (b *criticBinding) steerOptions() []agent.RunOption { // polls the handle each step (so the critic can raise a long run's budget),
// falling back to base when the handle defers (MaxSteps() <= 0).
func (b *criticBinding) maxStepsOption(base int) agent.Option {
if b == nil {
return agent.WithMaxSteps(base)
}
return agent.WithMaxStepsFunc(func() int {
if n := b.h.MaxSteps(); n > 0 {
return n
}
return base
})
}
// drainSteer returns the critic's queued steer messages (nil-safe), so the
// executor can merge them with the session steer mailbox into one WithSteer.
func (b *criticBinding) drainSteer() []llm.Message {
if b == nil { if b == nil {
return nil return nil
} }
return []agent.RunOption{agent.WithSteer(b.h.Steer)} return b.h.Steer()
} }
+41 -1
View File
@@ -23,9 +23,16 @@ type fakeCriticHandle struct {
mu sync.Mutex mu sync.Mutex
steps, tools, stops int steps, tools, stops int
steered int steered int
maxSteps int // 0 => defer to the run's base MaxIterations
killCause error // non-nil simulates a critic kill
} }
func (h *fakeCriticHandle) RecordStep(int) { h.mu.Lock(); h.steps++; h.mu.Unlock() } func (h *fakeCriticHandle) RecordStep(int, *llm.Response) { h.mu.Lock(); h.steps++; h.mu.Unlock() }
func (h *fakeCriticHandle) KillCause() error {
h.mu.Lock()
defer h.mu.Unlock()
return h.killCause
}
func (h *fakeCriticHandle) RecordToolStart(string, string) { func (h *fakeCriticHandle) RecordToolStart(string, string) {
h.mu.Lock() h.mu.Lock()
h.tools++ h.tools++
@@ -33,8 +40,41 @@ func (h *fakeCriticHandle) RecordToolStart(string, string) {
} }
func (h *fakeCriticHandle) Steer() []llm.Message { h.mu.Lock(); h.steered++; h.mu.Unlock(); return nil } func (h *fakeCriticHandle) Steer() []llm.Message { h.mu.Lock(); h.steered++; h.mu.Unlock(); return nil }
func (h *fakeCriticHandle) Deadline() time.Time { return time.Time{} } // no hard deadline func (h *fakeCriticHandle) Deadline() time.Time { return time.Time{} } // no hard deadline
func (h *fakeCriticHandle) MaxSteps() int { h.mu.Lock(); defer h.mu.Unlock(); return h.maxSteps }
func (h *fakeCriticHandle) Stop() { h.mu.Lock(); h.stops++; h.mu.Unlock() } func (h *fakeCriticHandle) Stop() { h.mu.Lock(); h.stops++; h.mu.Unlock() }
// TestCriticRaisesStepCeiling: a critic returning a higher MaxSteps lets the agent
// run PAST its base MaxIterations (the dynamic step ceiling). With base=1 and no
// critic the run would hit ErrMaxSteps after the first tool-dispatch step; the
// critic raises it to 5 so the run completes.
func TestCriticRaisesStepCeiling(t *testing.T) {
h := &fakeCriticHandle{maxSteps: 5}
fp := fake.New("fake")
fp.Enqueue("m",
// two tool-call steps (unknown tool → tolerated error results), then answer
fake.ReplyWith(llm.Response{ToolCalls: []llm.ToolCall{{ID: "c1", Name: "noop", Arguments: []byte(`{}`)}}}),
fake.ReplyWith(llm.Response{ToolCalls: []llm.ToolCall{{ID: "c2", Name: "noop", Arguments: []byte(`{}`)}}}),
fake.Reply("done after 2 tool steps"),
)
m, _ := fp.Model("m")
ex := run.New(run.Config{
Registry: tool.NewRegistry(),
Models: func(ctx context.Context, _ string) (context.Context, llm.Model, error) { return ctx, m, nil },
Ports: run.Ports{Critic: &fakeCritic{h: h}},
// large soft timeout so the deadline-watch never interferes in the test
Defaults: run.Defaults{CriticSoftTimeout: time.Hour},
})
res := ex.Run(context.Background(),
run.RunnableAgent{Name: "x", ModelTier: "m", MaxIterations: 1, Critic: run.CriticConfig{Enabled: true}},
tool.Invocation{RunID: "r"}, "go")
if res.Err != nil {
t.Fatalf("critic raised the ceiling to 5, run should complete past base=1: %v", res.Err)
}
if res.Output != "done after 2 tool steps" {
t.Errorf("output = %q", res.Output)
}
}
// TestCriticWired: an agent with Critic.Enabled gets monitored — Monitor returns // TestCriticWired: an agent with Critic.Enabled gets monitored — Monitor returns
// a handle the executor feeds (RecordStep), drains (Steer), and stops. // a handle the executor feeds (RecordStep), drains (Steer), and stops.
func TestCriticWired(t *testing.T) { func TestCriticWired(t *testing.T) {
+120 -19
View File
@@ -4,6 +4,7 @@ import (
"context" "context"
"errors" "errors"
"fmt" "fmt"
"strings"
"time" "time"
"gitea.stevedudenhoeffer.com/steve/majordomo/agent" "gitea.stevedudenhoeffer.com/steve/majordomo/agent"
@@ -101,6 +102,10 @@ type Result struct {
Steps []tool.Step Steps []tool.Step
Usage llm.Usage Usage llm.Usage
Err error Err error
// PostRunResult carries artifacts produced by a SessionToolFactory's PostRun
// hook (rendered images, files). nil when no factory was set or PostRun
// returned nil. The host delivers these (e.g. mort's chat API / Discord).
PostRunResult *tool.PostRunResult
} }
// Run executes ra with the given invocation + input and returns the Result. It // Run executes ra with the given invocation + input and returns the Result. It
@@ -156,14 +161,15 @@ func (e *Executor) Run(ctx context.Context, ra RunnableAgent, inv tool.Invocatio
// Audit start (optional). The recorder satisfies RunTally; stamp it on the // Audit start (optional). The recorder satisfies RunTally; stamp it on the
// invocation so a self-status tool can read live progress. // invocation so a self-status tool can read live progress.
info := RunInfo{ info := RunInfo{
RunID: inv.RunID, RunID: inv.RunID,
SubjectID: ra.ID, SubjectID: ra.ID,
Name: ra.Name, Name: ra.Name,
CallerID: inv.CallerID, CallerID: inv.CallerID,
ChannelID: inv.ChannelID, ChannelID: inv.ChannelID,
ParentRunID: inv.ParentRunID, ParentRunID: inv.ParentRunID,
Inputs: inv.SkillInputs, Inputs: inv.SkillInputs,
StartedAt: started, StartedAt: started,
MaxIterations: maxIter,
} }
var rec RunRecorder var rec RunRecorder
var stateAcc *RunStateAccessor var stateAcc *RunStateAccessor
@@ -175,6 +181,12 @@ func (e *Executor) Run(ctx context.Context, ra RunnableAgent, inv tool.Invocatio
inv.RunState = stateAcc inv.RunState = stateAcc
} }
// Steer mailbox: lets session tools (via inv.AttachImages) feed multimodal
// messages into the running conversation before its next step. Created BEFORE
// the toolbox build so any tool's handler captures the live AttachImages seam.
mailbox := &steerMailbox{}
inv.AttachImages = (&runSession{mailbox: mailbox}).AttachImages
// Build the toolbox from the agent's low-level tools. // Build the toolbox from the agent's low-level tools.
toolbox, err := e.cfg.Registry.Build(ra.LowLevelTools, inv, tool.Visibility("private"), nil) toolbox, err := e.cfg.Registry.Build(ra.LowLevelTools, inv, tool.Visibility("private"), nil)
if err != nil { if err != nil {
@@ -191,20 +203,56 @@ func (e *Executor) Run(ctx context.Context, ra RunnableAgent, inv tool.Invocatio
return res return res
} }
// Per-invocation ExtraTools + a SessionToolFactory's per-run tools, added on
// top of the agent's palette. The factory closes over the live session (the
// AttachImages mailbox); its PostRun hook (held for after the run) produces
// artifacts attached to res.PostRunResult, and its Cleanup is deferred. All
// nil-safe.
for _, t := range inv.ExtraTools {
if err := toolbox.Add(t); err != nil {
res.Err = fmt.Errorf("add extra tool: %w", err)
e.finishAudit(ctx, rec, "error", res, started, res.Err)
return res
}
}
var postRun func(ctx context.Context, transcript []llm.Message, output string, runErr error) *tool.PostRunResult
if inv.SessionToolFactory != nil {
st := inv.SessionToolFactory(&runSession{mailbox: mailbox})
if st.Cleanup != nil {
defer safeCleanup(st.Cleanup) // panic-isolated, like runPostRun
}
for _, t := range st.Tools {
if err := toolbox.Add(t); err != nil {
res.Err = fmt.Errorf("add session tool: %w", err)
e.finishAudit(ctx, rec, "error", res, started, res.Err)
return res
}
}
postRun = st.PostRun
}
// Run context: bound by MaxRuntime, detached from the caller's deadline so a // Run context: bound by MaxRuntime, detached from the caller's deadline so a
// lane/queue wait doesn't eat the run budget (mort's V10 lesson). Caller // lane/queue wait doesn't eat the run budget (mort's V10 lesson). Caller
// cancellation still propagates via MergeCancellation. Created BEFORE the // cancellation still propagates via MergeCancellation. Created BEFORE the
// step observer so the observer forwards the merged run context (not a // step observer so the observer forwards the merged run context (not a
// possibly-cancelled caller ctx) to OnStep consumers. // possibly-cancelled caller ctx) to OnStep consumers.
runCtx, cancel := context.WithTimeout(context.WithoutCancel(ctx), maxRuntime) // MaxRuntime stays a WithTimeout so its DeadlineExceeded propagates through the
defer cancel() // child chain (→ "timeout"), preserving the run's-own-timeout vs caller-cancel
// distinction. A NESTED cause-carrying layer lets a critic kill surface as a
// distinct "killed" without disturbing that: only an ErrCriticKill cause is
// consulted in statusFor; a generic run error or a caller cancel is classified
// by the run error itself.
timeoutCtx, cancelTimeout := context.WithTimeout(context.WithoutCancel(ctx), maxRuntime)
defer cancelTimeout()
runCtx, cancelCause := context.WithCancelCause(timeoutCtx)
defer cancelCause(nil)
runCtx, mergeCancel := MergeCancellation(runCtx, ctx) runCtx, mergeCancel := MergeCancellation(runCtx, ctx)
defer mergeCancel() defer mergeCancel()
// Critic (optional): monitors the run for a stall, can nudge/extend/kill via // Critic (optional): monitors the run for a stall, can nudge/extend/kill via
// its host Escalator. Its hard deadline is bound to runCtx (cancel on pass). // its host Escalator. Its hard deadline is bound to runCtx (cancel on pass).
// nil-safe: no-op when no critic is configured or the agent doesn't enable it. // nil-safe: no-op when no critic is configured or the agent doesn't enable it.
critic, stopCritic := e.startCritic(runCtx, cancel, ra, info) critic, stopCritic := e.startCritic(runCtx, cancelCause, ra, info)
defer stopCritic() defer stopCritic()
// Step instrumentation: accumulate Result.Steps + fire inv.OnStep, feed the // Step instrumentation: accumulate Result.Steps + fire inv.OnStep, feed the
@@ -221,7 +269,7 @@ func (e *Executor) Run(ctx context.Context, ra RunnableAgent, inv tool.Invocatio
if rec != nil { if rec != nil {
rec.OnStep(s.Index, s.Response) rec.OnStep(s.Index, s.Response)
} }
critic.recordStep(s.Index) // keep the critic's activity clock fresh critic.recordStep(s.Index, s.Response) // keep the critic's activity clock fresh + carry the step payload
var calls []llm.ToolCall var calls []llm.ToolCall
if s.Response != nil { if s.Response != nil {
calls = s.Response.ToolCalls calls = s.Response.ToolCalls
@@ -243,7 +291,10 @@ func (e *Executor) Run(ctx context.Context, ra RunnableAgent, inv tool.Invocatio
opts := []agent.Option{ opts := []agent.Option{
agent.WithToolbox(toolbox), agent.WithToolbox(toolbox),
agent.WithMaxSteps(maxIter), // Step ceiling: a fixed WithMaxSteps(maxIter) normally, but when a critic is
// active it owns a DYNAMIC ceiling (WithMaxStepsFunc) so it can raise a
// healthy-but-long run's budget mid-flight. Falls back to maxIter.
critic.maxStepsOption(maxIter),
agent.WithToolErrorLimits(e.cfg.Defaults.MaxConsecutiveToolErrors, e.cfg.Defaults.MaxSameToolCallRepeats), agent.WithToolErrorLimits(e.cfg.Defaults.MaxConsecutiveToolErrors, e.cfg.Defaults.MaxSameToolCallRepeats),
agent.WithStepObserver(stepObserver), agent.WithStepObserver(stepObserver),
} }
@@ -267,9 +318,12 @@ func (e *Executor) Run(ctx context.Context, ra RunnableAgent, inv tool.Invocatio
} }
ag := agent.New(model, e.systemPrompt(ra), opts...) ag := agent.New(model, e.systemPrompt(ra), opts...)
runRes, runErr := ag.Run(runCtx, input, critic.steerOptions()...) // One WithSteer drains BOTH the session mailbox (a tool's AttachImages) and
// the critic's nudges before each step.
steer := func() []llm.Message { return append(mailbox.drain(), critic.drainSteer()...) }
runRes, runErr := runAgent(runCtx, ag, input, inv.Images, agent.WithSteer(steer))
status := statusFor(runErr) status := statusFor(runCtx, runErr)
if runRes != nil { if runRes != nil {
res.Output = runRes.Output res.Output = runRes.Output
res.Usage = runRes.Usage res.Usage = runRes.Usage
@@ -277,6 +331,19 @@ func (e *Executor) Run(ctx context.Context, ra RunnableAgent, inv tool.Invocatio
res.Steps = emitter.snapshot() res.Steps = emitter.snapshot()
res.Err = runErr res.Err = runErr
// PostRun: hand the SessionToolFactory's hook the full transcript (populated
// even on partial results) so it can produce artifacts. Best-effort +
// panic-isolated — a PostRun failure never fails an otherwise-successful run.
if postRun != nil {
var transcript []llm.Message
if runRes != nil {
transcript = runRes.Messages
}
// Detach from the caller's ctx: a finished/cancelled caller must not abort
// artifact production (the hook owns its own bounding, per its contract).
res.PostRunResult = runPostRun(detach(ctx), postRun, transcript, res.Output, runErr)
}
e.finishAudit(ctx, rec, status, res, started, runErr) e.finishAudit(ctx, rec, status, res, started, runErr)
if e.cfg.Ports.Budget != nil { if e.cfg.Ports.Budget != nil {
e.cfg.Ports.Budget.Commit(detach(ctx), inv.CallerID, time.Since(started).Seconds()) e.cfg.Ports.Budget.Commit(detach(ctx), inv.CallerID, time.Since(started).Seconds())
@@ -285,13 +352,22 @@ func (e *Executor) Run(ctx context.Context, ra RunnableAgent, inv tool.Invocatio
return res return res
} }
// statusFor maps a run error to a RunStats.Status, distinguishing a deadline // statusFor maps a run error to a RunStats.Status, distinguishing a critic kill
// (timeout) and a cancellation (cancelled — caller cancel or shutdown) from a // (killed), a deadline (timeout), and a cancellation (cancelled — caller cancel
// generic error so audit consumers can tell them apart. // or shutdown) from a generic error so audit consumers can tell them apart. The
func statusFor(runErr error) string { // run context's cancellation cause carries the distinction (ErrCriticKill /
// DeadlineExceeded), since ctx.Err() alone only reports Canceled.
func statusFor(runCtx context.Context, runErr error) string {
switch { switch {
case runErr == nil: case runErr == nil:
return "ok" return "ok"
// Only the kill is recovered from the cancellation cause — a critic kill
// surfaces as a plain Canceled run error, so without this it'd read as
// "cancelled". Everything else is classified by the run error itself, so a
// genuine run error is never relabeled just because the context was later
// cancelled, and a caller cancel/deadline stays "cancelled" (not "timeout").
case errors.Is(context.Cause(runCtx), ErrCriticKill):
return "killed"
case errors.Is(runErr, context.DeadlineExceeded): case errors.Is(runErr, context.DeadlineExceeded):
return "timeout" return "timeout"
case errors.Is(runErr, context.Canceled): case errors.Is(runErr, context.Canceled):
@@ -365,3 +441,28 @@ func detach(ctx context.Context) context.Context {
_ = cancel // bounded by the timeout; nothing to cancel early _ = cancel // bounded by the timeout; nothing to cancel early
return c return c
} }
// runAgent dispatches the majordomo agent loop. majordomo's Run takes a text-only
// input arg, so when the invocation carries images they're folded into the first
// user message (text + image parts) via WithHistory and Run is called with an
// empty input — the model then sees a multimodal opening turn. The image-less path
// passes the prompt straight through.
//
// The text part is omitted when input is blank (image-only run), matching
// runSession.AttachImages so no empty TextPart is sent.
func runAgent(ctx context.Context, ag *agent.Agent, input string, images []llm.ImagePart, opts ...agent.RunOption) (*agent.Result, error) {
if len(images) == 0 {
return ag.Run(ctx, input, opts...)
}
parts := make([]llm.Part, 0, len(images)+1)
if strings.TrimSpace(input) != "" {
parts = append(parts, llm.Text(input))
}
for _, img := range images {
parts = append(parts, img)
}
// Copy opts before appending so a caller-supplied backing array is never
// mutated/aliased (the variadic slice can have spare capacity).
opts = append(opts[:len(opts):len(opts)], agent.WithHistory([]llm.Message{llm.UserParts(parts...)}))
return ag.Run(ctx, "", opts...)
}
+20 -7
View File
@@ -148,20 +148,33 @@ func TestExecutorNilModelNoPanic(t *testing.T) {
} }
} }
// TestStatusFor maps run errors to RunStats.Status (gadfly F3). // TestStatusFor maps run errors + cancellation cause to RunStats.Status (gadfly F3).
func TestStatusFor(t *testing.T) { func TestStatusFor(t *testing.T) {
bg := context.Background()
// A context cancelled with the critic-kill cause: ctx.Err() is Canceled, but
// context.Cause carries ErrCriticKill → "killed".
killCtx, killCancel := context.WithCancelCause(context.Background())
killCancel(fmt.Errorf("%w: hung", ErrCriticKill))
// A context cancelled with a non-kill cause must NOT relabel a genuine run
// error: a real error stays "error" even though the ctx was later cancelled.
cancelledCtx, cc := context.WithCancelCause(context.Background())
cc(context.DeadlineExceeded)
cases := []struct { cases := []struct {
ctx context.Context
err error err error
want string want string
}{ }{
{nil, "ok"}, {bg, nil, "ok"},
{context.DeadlineExceeded, "timeout"}, {bg, context.DeadlineExceeded, "timeout"},
{context.Canceled, "cancelled"}, {bg, context.Canceled, "cancelled"},
{fmt.Errorf("wrapped: %w", context.DeadlineExceeded), "timeout"}, {bg, fmt.Errorf("wrapped: %w", context.DeadlineExceeded), "timeout"},
{errors.New("boom"), "error"}, {bg, errors.New("boom"), "error"},
{killCtx, context.Canceled, "killed"},
{cancelledCtx, errors.New("boom"), "error"}, // generic error not relabeled by cause
{cancelledCtx, context.Canceled, "cancelled"}, // caller cancel stays cancelled, not timeout
} }
for _, c := range cases { for _, c := range cases {
if got := statusFor(c.err); got != c.want { if got := statusFor(c.ctx, c.err); got != c.want {
t.Errorf("statusFor(%v) = %q, want %q", c.err, got, c.want) t.Errorf("statusFor(%v) = %q, want %q", c.err, got, c.want)
} }
} }
+121
View File
@@ -0,0 +1,121 @@
package run_test
import (
"context"
"strings"
"testing"
"gitea.stevedudenhoeffer.com/steve/majordomo/llm"
"gitea.stevedudenhoeffer.com/steve/majordomo/provider/fake"
"gitea.stevedudenhoeffer.com/steve/executus/run"
"gitea.stevedudenhoeffer.com/steve/executus/tool"
)
// TestExecutorFoldsInitialImages: when the invocation carries Images, they're
// folded into the first user message (alongside the prompt text) instead of being
// dropped — majordomo's Run input arg is text-only, so the executor seeds the
// multimodal opening turn via history.
func TestExecutorFoldsInitialImages(t *testing.T) {
fp := fake.New("fake")
fp.Enqueue("m", fake.Reply("saw the image"))
m, _ := fp.Model("m")
img := llm.ImagePart{MIME: "image/png", Data: []byte("PNGDATA")}
inv := tool.Invocation{RunID: "r1", Images: []llm.ImagePart{img}}
ex := run.New(run.Config{
Registry: tool.NewRegistry(),
Models: func(ctx context.Context, _ string) (context.Context, llm.Model, error) { return ctx, m, nil },
})
res := ex.Run(context.Background(), run.RunnableAgent{ModelTier: "m"}, inv, "describe this")
if res.Err != nil {
t.Fatalf("run error: %v", res.Err)
}
calls := fp.Calls()
if len(calls) == 0 {
t.Fatal("no model calls recorded")
}
// The text + image must be CO-LOCATED in a single user message (not split
// across two), so the model reads them as one multimodal turn.
coLocated := false
for _, msg := range calls[0].Request.Messages {
sawImage, sawText := false, false
for _, p := range msg.Parts {
switch pp := p.(type) {
case llm.ImagePart:
if string(pp.Data) == "PNGDATA" {
sawImage = true
}
case llm.TextPart:
if strings.Contains(pp.Text, "describe this") {
sawText = true
}
}
}
if sawImage && sawText {
coLocated = true
}
}
if !coLocated {
t.Error("image + prompt text were not folded into the SAME user message")
}
}
// TestExecutorImageOnlyNoBlankText: an image-only run (blank prompt) must NOT emit
// an empty TextPart — the message carries just the image, matching
// runSession.AttachImages's guard.
func TestExecutorImageOnlyNoBlankText(t *testing.T) {
fp := fake.New("fake")
fp.Enqueue("m", fake.Reply("saw it"))
m, _ := fp.Model("m")
inv := tool.Invocation{RunID: "r3", Images: []llm.ImagePart{{MIME: "image/png", Data: []byte("IMG")}}}
ex := run.New(run.Config{
Registry: tool.NewRegistry(),
Models: func(ctx context.Context, _ string) (context.Context, llm.Model, error) { return ctx, m, nil },
})
res := ex.Run(context.Background(), run.RunnableAgent{ModelTier: "m"}, inv, " ")
if res.Err != nil {
t.Fatalf("run error: %v", res.Err)
}
for _, msg := range fp.Calls()[0].Request.Messages {
for _, p := range msg.Parts {
if tp, ok := p.(llm.TextPart); ok && strings.TrimSpace(tp.Text) == "" {
t.Error("image-only run emitted a blank TextPart")
}
}
}
}
// TestExecutorTextOnlyUnchanged: with no Images, the prompt flows through as the
// text input (regression guard that the fold path didn't break the common case).
func TestExecutorTextOnlyUnchanged(t *testing.T) {
fp := fake.New("fake")
fp.Enqueue("m", fake.Reply("ok"))
m, _ := fp.Model("m")
ex := run.New(run.Config{
Registry: tool.NewRegistry(),
Models: func(ctx context.Context, _ string) (context.Context, llm.Model, error) { return ctx, m, nil },
})
res := ex.Run(context.Background(), run.RunnableAgent{ModelTier: "m"}, tool.Invocation{RunID: "r2"}, "plain prompt")
if res.Err != nil {
t.Fatalf("run error: %v", res.Err)
}
calls := fp.Calls()
if len(calls) == 0 {
t.Fatal("no model calls recorded")
}
sawText := false
for _, msg := range calls[0].Request.Messages {
for _, p := range msg.Parts {
if tp, ok := p.(llm.TextPart); ok && strings.Contains(tp.Text, "plain prompt") {
sawText = true
}
}
}
if !sawText {
t.Error("text-only prompt did not reach the model")
}
}
+25 -2
View File
@@ -2,6 +2,7 @@ package run
import ( import (
"context" "context"
"errors"
"time" "time"
"gitea.stevedudenhoeffer.com/steve/majordomo/llm" "gitea.stevedudenhoeffer.com/steve/majordomo/llm"
@@ -9,6 +10,12 @@ import (
"gitea.stevedudenhoeffer.com/steve/executus/deliver" "gitea.stevedudenhoeffer.com/steve/executus/deliver"
) )
// ErrCriticKill is the cancellation cause the executor stamps on a run the
// critic kills, so a critic kill surfaces as a distinct "killed" status (vs a
// backstop "timeout" or a caller "cancelled"). A host CriticHandle signals a
// kill via KillCause(); the executor wraps that reason with this sentinel.
var ErrCriticKill = errors.New("run: critic killed the run")
// Ports are the host seams the run executor consumes. Every field is nil-safe: // Ports are the host seams the run executor consumes. Every field is nil-safe:
// a light host passes the zero Ports and gets a bounded, in-memory run with no // a light host passes the zero Ports and gets a bounded, in-memory run with no
// persistence, audit, budget, critic, delegation, or delivery — which is // persistence, audit, budget, critic, delegation, or delivery — which is
@@ -48,6 +55,9 @@ type RunInfo struct {
ParentRunID string ParentRunID string
Inputs map[string]any Inputs map[string]any
StartedAt time.Time StartedAt time.Time
// MaxIterations is the run's base tool-dispatch step ceiling, so a critic can
// raise it relative to the baseline (see CriticHandle.MaxSteps).
MaxIterations int
} }
// RunStats is the terminal roll-up a recorder's Close writes. Mirrors mort's // RunStats is the terminal roll-up a recorder's Close writes. Mirrors mort's
@@ -120,8 +130,10 @@ type Critic interface {
// methods (the critic battery's handle guards its state with a mutex). // methods (the critic battery's handle guards its state with a mutex).
type CriticHandle interface { type CriticHandle interface {
// RecordStep / RecordToolStart keep the critic's activity clock fresh so a // RecordStep / RecordToolStart keep the critic's activity clock fresh so a
// healthy-but-slow run is not mistaken for a hang. // healthy-but-slow run is not mistaken for a hang. RecordStep also carries the
RecordStep(iter int) // completed step's model response (nil-safe) so the critic's Trace can show
// what the agent actually produced, not just an iteration count.
RecordStep(iter int, resp *llm.Response)
RecordToolStart(name, args string) RecordToolStart(name, args string)
// Steer returns any messages the critic wants injected into the loop (a // Steer returns any messages the critic wants injected into the loop (a
// nudge), drained before each step — matches majordomo agent.WithSteer. // nudge), drained before each step — matches majordomo agent.WithSteer.
@@ -129,6 +141,17 @@ type CriticHandle interface {
// Deadline returns the current hard-kill deadline (the critic may extend // Deadline returns the current hard-kill deadline (the critic may extend
// it); the executor binds the run context to it. Zero = no hard deadline. // it); the executor binds the run context to it. Zero = no hard deadline.
Deadline() time.Time Deadline() time.Time
// MaxSteps returns the current tool-dispatch step ceiling, polled by the
// executor each step (via majordomo WithMaxStepsFunc) so a critic can raise a
// healthy-but-long run's iteration budget mid-flight. Return <= 0 to defer to
// the run's base MaxIterations.
MaxSteps() int
// KillCause returns a non-nil reason iff the critic has decided to KILL this
// run (as opposed to letting the hard-deadline backstop expire). The executor
// reads it when the deadline passes: non-nil → cancel the run with
// ErrCriticKill (status "killed"); nil → the backstop expired naturally
// (status "timeout"). Hosts that never distinguish the two may return nil.
KillCause() error
// Stop ends monitoring when the run finishes. // Stop ends monitoring when the run finishes.
Stop() Stop()
} }
+88
View File
@@ -0,0 +1,88 @@
package run
import (
"context"
"log/slog"
"strings"
"sync"
"gitea.stevedudenhoeffer.com/steve/majordomo/llm"
"gitea.stevedudenhoeffer.com/steve/executus/tool"
)
// runPostRun invokes a SessionToolFactory's PostRun hook with panic isolation:
// a PostRun panic (or a slow artifact build that the hook mishandles) must not
// fail an otherwise-successful run — artifacts are best-effort, the agent's text
// output is the source of truth.
func runPostRun(ctx context.Context,
hook func(context.Context, []llm.Message, string, error) *tool.PostRunResult,
transcript []llm.Message, output string, runErr error) (prr *tool.PostRunResult) {
defer func() {
if r := recover(); r != nil {
slog.Error("run: PostRun hook panicked; no artifacts produced", "panic", r)
prr = nil
}
}()
return hook(ctx, transcript, output, runErr)
}
// steerMailbox is a thread-safe queue of messages a session tool (via
// tool.Invocation.AttachImages) wants injected into the agent loop before its
// next step — the same WithSteer mechanism the critic uses for nudges, exposed
// to ordinary tools so they can show the model content (e.g. a rendered
// preview) it must SEE, not just be told about.
type steerMailbox struct {
mu sync.Mutex
msgs []llm.Message
}
func (m *steerMailbox) push(msg llm.Message) {
m.mu.Lock()
m.msgs = append(m.msgs, msg)
m.mu.Unlock()
}
// drain returns and clears the queued messages (nil when empty).
func (m *steerMailbox) drain() []llm.Message {
m.mu.Lock()
defer m.mu.Unlock()
if len(m.msgs) == 0 {
return nil
}
out := m.msgs
m.msgs = nil
return out
}
// runSession implements tool.AgentSession over a steer mailbox: AttachImages
// queues a user-role multimodal message the agent loop injects before its next
// step. Replaces legacy agentkit's Agent.AttachImages — majordomo's *agent.Agent
// is immutable mid-run, so mutation flows through the run-scoped steer mailbox.
type runSession struct{ mailbox *steerMailbox }
func (s *runSession) AttachImages(text string, images ...llm.ImagePart) {
parts := make([]llm.Part, 0, len(images)+1)
if strings.TrimSpace(text) != "" {
parts = append(parts, llm.Text(text))
}
for _, img := range images {
parts = append(parts, img)
}
if len(parts) == 0 {
return
}
s.mailbox.push(llm.UserParts(parts...))
}
// safeCleanup runs a SessionTools.Cleanup with panic isolation, so a misbehaving
// teardown (temp-dir removal, handle close) can't clobber an otherwise-successful
// run via the executor's top-level recover.
func safeCleanup(fn func()) {
defer func() {
if r := recover(); r != nil {
slog.Error("run: session Cleanup panicked", "panic", r)
}
}()
fn()
}
+94
View File
@@ -0,0 +1,94 @@
package run_test
import (
"context"
"testing"
"gitea.stevedudenhoeffer.com/steve/majordomo/llm"
"gitea.stevedudenhoeffer.com/steve/majordomo/provider/fake"
"gitea.stevedudenhoeffer.com/steve/executus/run"
"gitea.stevedudenhoeffer.com/steve/executus/tool"
)
// TestSessionToolFactoryPostRun: a SessionToolFactory's PostRun hook produces an
// artifact (from the run output + transcript) that lands on Result.PostRunResult,
// and its Cleanup is deferred.
func TestSessionToolFactoryPostRun(t *testing.T) {
fp := fake.New("fake")
fp.Enqueue("m", fake.Reply("hello artifacts"))
m, _ := fp.Model("m")
cleanupCalled := false
inv := tool.Invocation{
RunID: "r1",
SessionToolFactory: func(_ tool.AgentSession) tool.SessionTools {
return tool.SessionTools{
PostRun: func(_ context.Context, transcript []llm.Message, output string, _ error) *tool.PostRunResult {
return &tool.PostRunResult{
Artifacts: []tool.Artifact{{Name: "out.txt", MimeType: "text/plain", Data: []byte(output)}},
Metadata: map[string]any{"transcript_len": len(transcript)},
}
},
Cleanup: func() { cleanupCalled = true },
}
},
}
ex := run.New(run.Config{
Registry: tool.NewRegistry(),
Models: func(ctx context.Context, _ string) (context.Context, llm.Model, error) { return ctx, m, nil },
})
res := ex.Run(context.Background(), run.RunnableAgent{ModelTier: "m"}, inv, "go")
if res.Err != nil {
t.Fatalf("run error: %v", res.Err)
}
if res.PostRunResult == nil {
t.Fatal("Result.PostRunResult is nil — PostRun hook not invoked / not attached")
}
if n := len(res.PostRunResult.Artifacts); n != 1 {
t.Fatalf("artifacts = %d, want 1", n)
}
a := res.PostRunResult.Artifacts[0]
if a.Name != "out.txt" || string(a.Data) != "hello artifacts" {
t.Errorf("artifact = {%q, %q}", a.Name, string(a.Data))
}
if tl, _ := res.PostRunResult.Metadata["transcript_len"].(int); tl < 1 {
t.Errorf("transcript not passed to PostRun (len=%d)", tl)
}
if !cleanupCalled {
t.Error("Cleanup was not deferred/called")
}
}
// TestSessionToolFactoryAddsTool: tools the factory returns join the run's
// toolbox and are callable by the model.
func TestSessionToolFactoryAddsTool(t *testing.T) {
fp := fake.New("fake")
fp.Enqueue("m",
fake.ReplyWith(llm.Response{ToolCalls: []llm.ToolCall{{ID: "c1", Name: "render", Arguments: []byte(`{}`)}}}),
fake.Reply("rendered"),
)
m, _ := fp.Model("m")
toolCalled := false
renderTool := llm.DefineTool("render", "render a preview",
func(_ context.Context, _ struct{}) (any, error) { toolCalled = true; return "ok", nil })
inv := tool.Invocation{
RunID: "r2",
SessionToolFactory: func(_ tool.AgentSession) tool.SessionTools {
return tool.SessionTools{Tools: []llm.Tool{renderTool}}
},
}
ex := run.New(run.Config{
Registry: tool.NewRegistry(),
Models: func(ctx context.Context, _ string) (context.Context, llm.Model, error) { return ctx, m, nil },
})
res := ex.Run(context.Background(),
run.RunnableAgent{ModelTier: "m", MaxIterations: 5}, inv, "go")
if res.Err != nil {
t.Fatalf("run error: %v", res.Err)
}
if !toolCalled {
t.Error("session-factory tool was not added to the toolbox / not called")
}
}