28 Commits

Author SHA1 Message Date
steve f367796244 Merge pull request 'run: fold inv.Images into the initial user message (multimodal opening turn)' (#16) from feat/initial-images into main
executus CI / test (push) Failing after 18m3s
Adversarial Review (Gadfly) / review (pull_request) Successful in 2s
2026-06-28 05:16:46 +00:00
steve 0acaa8c9a5 run: guard empty text part in runAgent + drop cross-repo doc ref (gadfly #16)
executus CI / test (pull_request) Successful in 1m46s
Every reviewer flagged that runAgent appended llm.Text(input) unconditionally, so
an image-only run (blank prompt) emitted an empty TextPart — inconsistent with the
sibling runSession.AttachImages which guards it. Mirror that guard
(strings.TrimSpace(input) != ""). Also:
- copy opts before appending (variadic backing array can have spare capacity; avoid
  aliasing a caller's slice).
- reword the doc comment to drop the mort-agentexec reference (executus is a
  standalone lib; a consumer name doesn't belong in its godoc).

Tests: image+text are co-located in ONE user message; an image-only run emits no
blank TextPart.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-28 01:11:15 -04:00
steve a35c176b42 run: fold inv.Images into the initial user message (multimodal opening turn)
executus CI / test (pull_request) Successful in 46s
Adversarial Review (Gadfly) / review (pull_request) Successful in 6m5s
The executor passed only the text `input` to majordomo's agent.Run, silently
dropping inv.Images — so a multimodal run (vision: chatbot @mention, chat API)
lost its images on the executus path. majordomo's Run input arg is text-only, so
fold the images into the first user message (text + image parts) via WithHistory
and call Run with empty input, mirroring mort agentexec's multimodal seeding. The
image-less path is unchanged (prompt passes straight through).

Tests: a run with Images carries the image bytes + prompt into the first model
request; the text-only path still reaches the model.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-28 00:37:53 -04:00
steve 1cf46c9954 ci: track gadfly's v1 release tag instead of a pinned sha (#15)
executus CI / test (push) Successful in 52s
2026-06-28 04:08:30 +00:00
steve 56baac758d ci: inherit gadfly's default swarm (slim caller, re-pin @b02b11d) (#14)
executus CI / test (push) Successful in 50s
2026-06-28 02:48:25 +00:00
steve 5779035722 Merge pull request 'ci: subscribe to gadfly's reusable review workflow (cloud + Claude Code, no local)' (#13) from ci/gadfly-reusable into main
executus CI / test (push) Successful in 3m56s
2026-06-28 01:43:42 +00:00
Steve Dudenhoeffer 1a2a2364ec security: scope forwarded secrets + pin gadfly reusable to an immutable sha
executus CI / test (pull_request) Successful in 2m13s
Adversarial Review (Gadfly) / review (pull_request) Successful in 10m31s
Address the swarm's findings on this rollout:
- Replace `secrets: inherit` (which forwarded ALL repo secrets — registry/
  Komodo/Discord/DB creds the reviewer never uses) with explicit forwarding of
  only OLLAMA_CLOUD_API_KEY / CLAUDE_CODE_OAUTH_TOKEN / findings tokens.
  GITEA_TOKEN is the automatic job token (github.token in the reusable).
- Pin uses: ...@main -> @20a5c43 (immutable) so a push to gadfly can't change
  the code that runs with our forwarded secrets.

Requires gadfly's review-reusable.yml secrets contract (steve/gadfly#9, merged).

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-27 21:18:59 -04:00
Steve Dudenhoeffer c08ce47fa6 ci: subscribe to gadfly's reusable review workflow (cloud + Claude Code, no local)
executus CI / test (pull_request) Successful in 47s
Adversarial Review (Gadfly) / review (pull_request) Successful in 12m29s
Replace the full self-contained stub with a thin caller of steve/gadfly's
reusable workflow, using gadfly's own dogfood config: 6 cloud models +
the Claude Code engine (sonnet, opus, opus:max). No local Macs / foreman.
Advisory only.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-27 20:03:00 -04:00
steve 784d5d7ce4 run: PostRun detached ctx + panic-isolated Cleanup (gadfly #12)
executus CI / test (pull_request) Successful in 45s
executus CI / test (push) Successful in 1m47s
Two convergent gadfly refinements on the PostRun wiring:
- PostRun now runs on detach(ctx), not the caller's ctx — a finished/cancelled
  caller no longer aborts artifact production (3-model: glm-5.2/minimax/deepseek).
- Cleanup is panic-isolated via safeCleanup (recover+log), matching runPostRun, so
  a misbehaving teardown can't clobber an otherwise-successful run (deepseek).

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-27 18:33:41 -04:00
steve 4e179259de run: wire SessionToolFactory + PostRun artifacts + AttachImages
executus CI / test (pull_request) Successful in 1m49s
Adversarial Review (Gadfly) / review (pull_request) Successful in 5m19s
The session-tool TYPES already lived in tool/ (P4 move) but the executor never
used them. This wires them, unblocking artifact-producing host surfaces (mort's
chat API / chatbot / .skill / scaddy) to run on executus:

- run/session.go: steerMailbox (thread-safe message queue) + runSession
  (tool.AgentSession over it: AttachImages → a user-role multimodal message
  injected before the agent's next step) + runPostRun (panic-isolated hook call).
- executor: create the mailbox + set inv.AttachImages BEFORE the toolbox build;
  add inv.ExtraTools + a SessionToolFactory's per-run Tools to the toolbox; defer
  its Cleanup; merge the session mailbox with the critic's nudges into ONE
  WithSteer; after the run, call PostRun with the full transcript
  (runRes.Messages) → Result.PostRunResult (best-effort, never fails the run).
- run.Result += PostRunResult *tool.PostRunResult.
- dropped the now-dead criticBinding.steerOptions (superseded by drainSteer).

Tests: a factory whose PostRun emits an artifact from the output+transcript +
Cleanup lands on Result.PostRunResult; a factory-added tool is callable.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-27 18:13:16 -04:00
steve 82a816ae29 ci(gadfly): trim pool to the strong 6 (drop m5/qwen3.6, gemma4, gpt-oss, kimi-k2.7)
executus CI / test (push) Successful in 46s
Pool now: minimax-m3, glm-5.2, glm-5.1, deepseek-v4-pro, nemotron-3-super,
qwen3-coder:480b (all cloud, ollama-cloud=3). Removed the low-value reviewers +
the last local endpoint (m5).

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-27 18:06:36 -04:00
steve be4bbbcad5 run: fix statusFor — don't relabel a generic error / caller-cancel as timeout (gadfly #11)
executus CI / test (pull_request) Successful in 47s
executus CI / test (push) Successful in 45s
The WithCancelCause+timer rewrite made MaxRuntime surface as Canceled (not
DeadlineExceeded), so statusFor's context.Cause(DeadlineExceeded) check could
relabel (a) a genuine run error as 'timeout' and (b) a caller cancel/deadline as
'timeout' (was 'cancelled'). Convergent gadfly finding (glm-5.2 + cluster).

Fix: keep MaxRuntime as WithTimeout (its DeadlineExceeded propagates → 'timeout',
preserving own-timeout vs caller-cancel), add a NESTED WithCancelCause layer only
for the kill. statusFor consults context.Cause ONLY for ErrCriticKill; everything
else is classified by the run error itself. Tests: generic-error-not-relabeled +
caller-cancel-stays-cancelled.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-27 17:00:26 -04:00
steve 390e6cf905 run: critic parity — fuller RecordStep + cause-carrying Kill (distinct status)
executus CI / test (pull_request) Successful in 46s
Adversarial Review (Gadfly) / review (pull_request) Successful in 22m30s
Completes the run-critic seam so a host adapter (mort's agentcritic) has full
fidelity, closing the two limitations gadfly surfaced on mort #1334.

- RecordStep(iter int, resp *llm.Response): the completed step's model response
  is now passed to the critic (was index-only), so a host that records a trace
  (mort's ProgressRecorder) can show what the agent actually produced, not just
  an iteration count. The executor forwards s.Response; the battery ignores it
  (its Progress is count-based).
- CriticHandle.KillCause() error + ErrCriticKill: the executor now distinguishes
  an explicit critic KILL from a natural backstop expiry. runCtx uses a
  cause-carrying cancel (WithCancelCause + a MaxRuntime timer cancelling with
  DeadlineExceeded); the deadline-watch cancels with ErrCriticKill when
  KillCause()!=nil, else DeadlineExceeded. statusFor reads context.Cause →
  killed / timeout / cancelled are now distinct (were all "cancelled"). The
  battery sets killCause from Decision.KillReason on a Kill.

Tests: statusFor "killed" case (cause=ErrCriticKill, err=Canceled); fake handle
+ battery RecordStep/KillCause signatures. Core stays battery-free.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-27 16:35:13 -04:00
steve 1a1d5e417b chore: go mod tidy (add missing go.sum entry; CI tidiness gate)
executus CI / test (pull_request) Successful in 2m8s
executus CI / test (push) Successful in 1m45s
Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-27 14:53:58 -04:00
steve f3bd43b726 ci(gadfly): drop the m1 reviewer (dead weight; keep m5)
executus CI / test (pull_request) Failing after 1m1s
m1/qwen3:14b proved consistently low-value + slowest in the pool over multiple
PRs. Removed from GADFLY_MODELS + GADFLY_PROVIDER_CONCURRENCY + its endpoint so it
never fires again. m5 retained.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-27 14:41:14 -04:00
steve 306d575c31 critic: overflow-guard maxSteps += RaiseStepsBy (gadfly 5-model convergence)
executus CI / test (pull_request) Has been cancelled
A buggy/hostile Escalator returning a huge RaiseStepsBy could wrap handle.maxSteps
negative (which the executor reads as defer-to-base). Clamp at math.MaxInt.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-27 14:38:48 -04:00
steve 4ba83ab905 run: critic can raise a run's step ceiling mid-flight (CriticHandle.MaxSteps)
executus CI / test (pull_request) Failing after 1m1s
Adversarial Review (Gadfly) / review (pull_request) Successful in 21m8s
Prerequisite for a full-fidelity mort agentcritic adapter (which adjusts a
healthy-but-long run's iteration budget, not just its deadline). executus's
CriticHandle was deadline+steer only; this adds the dynamic step ceiling above
an unchanged majordomo (which already exposes WithMaxStepsFunc).

- run.RunInfo += MaxIterations (the run's base ceiling, so a critic can raise it
  relative to the baseline).
- run.CriticHandle += MaxSteps() int — polled by the executor each step via
  agent.WithMaxStepsFunc; <=0 defers to the base. The executor uses
  WithMaxStepsFunc(critic.MaxSteps) when a critic is active, else WithMaxSteps.
- critic battery: handle.maxSteps (initialised from RunInfo.MaxIterations) +
  MaxSteps(); Decision gains RaiseStepsBy so an Escalator can raise the ceiling
  alongside ExtendBy. ExtendOnce default is unchanged (time-only).

Test: a critic returning MaxSteps=5 lets a base-MaxIterations=1 run complete two
tool-dispatch steps past the base ceiling. Core stays battery-free (run doesn't
import critic).

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-27 14:16:03 -04:00
steve a103cc5e9f ci(gadfly): 9-cloud panel @ 3 models x 3 lenses (9 concurrent)
executus CI / test (push) Failing after 1m57s
Match mort: minimax-m3, glm-5.2, glm-5.1 (SWE-Bench Pro SOTA), kimi-k2.7-code,
deepseek-v4-pro, nemotron-3-super, gpt-oss:120b, qwen3-coder:480b, gemma4 (8
families) + m1/m5 locals. ollama-cloud=3 x lens=3 = 9 concurrent (10 budget).

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-27 12:17:24 -04:00
steve 4d28cd6e2c ci(gadfly): 4-cloud pool — add kimi-k2.7-code + deepseek-v4-pro, drop v4-flash
executus CI / test (push) Failing after 1m2s
Match mort's new cloud panel: minimax-m3, glm-5.2, kimi-k2.7-code (Moonshot),
deepseek-v4-pro (frontier, replaces v4-flash). Keeps m1/m5 locals + the existing
ollama-cloud=1 + lens-concurrency=3 serial-model style.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-27 11:59:13 -04:00
steve dcaefff756 ci(gadfly): add M1/M5 Macs back to the reviewer pool (full fleet)
executus CI / test (push) Failing after 1m23s
Re-adds the local Macs (m1/qwen3:14b, m5/qwen3.6:35b-mlx) via their foreman endpoints alongside the 3 cloud models. Cloud keeps lens fan-out (ollama-cloud=1 model + lens=3); each Mac runs one model with lenses serial (foreman serializes anyway); all provider lanes parallel. Bumps the job timeout 30->90m for the slow local lanes. With findings telemetry now on, gadfly-reports can quantify whether the Macs earn their keep.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-27 10:44:22 -04:00
steve 97154395e6 C0b: document recordToolStart post-iteration timing (gadfly glm finding)
executus CI / test (pull_request) Failing after 59s
executus CI / test (push) Failing after 1m1s
majordomo's step observer fires post-iteration, so the critic's activity clock
refreshes per-iteration, not mid-tool — a single long tool call won't refresh it
until it returns. Documented + the host-progress-bridge mitigation (mort's
pattern). A true pre-dispatch hook needs majordomo support (follow-up).

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-27 10:10:56 -04:00
steve 4aa06f652e C0b: address verified gadfly findings (panic-safety + test honesty)
executus CI / test (pull_request) Failing after 58s
From PR #9 (minimax + deepseek):
- Run now has a top-level recover() — the "never propagates a panic" promise was
  unenforced; a panicking host Port (Critic/Audit/Palette) on the run goroutine
  now becomes Result.Err instead of unwinding into the caller.
- The critic deadline-watch goroutine recovers panics from a host Deadline()
  (it's a separate goroutine, so Run's recover can't catch it) — a buggy
  CriticHandle can't crash the process.
- CriticHandle interface documents its concurrency contract (Record*/Steer on the
  run goroutine vs Deadline()/Stop() from the watch goroutine — impls must be
  concurrent-safe; the critic battery already is).
- startCritic's dead `soft <= 0 -> noop` guard (withFallbacks already coerces to
  90s) replaced with a defensive inline 90s default, so a bypass of withFallbacks
  still gets a working critic instead of silently none.
- Delivery tests made honest: the old "error path" test only checked the
  early-return (no delivery); added TestDeliverErrorOnRunFailure (in-loop model
  error -> DeliverError to the target) + renamed the early-return test.

Graded all #9 findings in the gadfly MCP.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-27 10:09:22 -04:00
steve 43b2471737 C0b: wire Critic + Delivery into run.Executor
executus CI / test (pull_request) Failing after 1m0s
Adversarial Review (Gadfly) / review (pull_request) Successful in 5m9s
Continues finishing the executor's run.Ports wiring (after C0's Palette).

Critic (run/critic.go): when Ports.Critic is set and the agent enables it, the
executor calls Monitor at run start, feeds RecordStep/RecordToolStart from the
step observer, drains the critic's Steer messages into the loop via
agent.WithSteer, and binds the run's hard cancellation to the critic's
(extendable) Deadline through a watch goroutine — a healthy-but-slow run gets
room while a hung one is killed. Stop() on run end. Soft timeout from
Defaults.CriticSoftTimeout (default 90s). nil-safe: no critic / not-enabled =
no-op.

Delivery (run/executor.go deliver): after the run, when Ports.Delivery is set
and inv.DeliveryID is non-empty, the executor posts Result.Output (or
DeliverError on failure) to a host-interpreted deliver.Target
{inv.DeliveryKind, inv.DeliveryID}. Empty target = caller reads Result.Output
itself (the synchronous default; the `.agent run` canary). Best-effort +
detached.

tool.Invocation gains DeliveryKind/DeliveryID (host-set egress target).

Tests: critic monitored/fed/steered/stopped when enabled, untouched when not;
delivery posts on a target, skips without one. Deferred: Checkpointer (needs a
majordomo hook to snapshot the running message history).

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-27 10:00:05 -04:00
steve 0c80679719 C0: address verified gadfly findings (trivial fixes)
executus CI / test (pull_request) Failing after 1m31s
executus CI / test (push) Failing after 1m31s
From the PR #8 review (all graded in the gadfly MCP):
- skip empty palette names + dedupe by final tool name, instead of producing a
  "skill__" tool or an opaque box.Add duplicate error.
- delegationResult: no trailing blank line when a non-ok child produced no output.
- delegationErr: fold a child's partial output into the hard-failure error so it
  isn't silently dropped.

Deferred to C0b (design-level, not trivial): route delegation through the
tool.Registry gate/audit wrappers; expose the skill's real input schema to the
LLM instead of a generic inputs map. typed-nil PaletteSource is left as a caller
contract (the == nil guard catches the untyped-nil interface).

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-27 09:53:11 -04:00
steve 9d41987b0e C0: wire Palette delegation into run.Executor (skill__/agent__ tools)
executus CI / test (pull_request) Failing after 1m2s
Adversarial Review (Gadfly) / review (pull_request) Successful in 3m47s
The first cutover prerequisite: the executor now turns an agent's SkillPalette /
SubAgentPalette into delegation tools so a mort agent that delegates works
through run.Executor (the piece the `.agent run` canary needs beyond the
already-wired audit/budget).

- run/palette.go: addDelegationTools builds a skill__<name> tool (structured
  inputs) per SkillPalette entry and an agent__<name> tool (prompt) per
  SubAgentPalette entry, each invoking run.Ports.Palette as a CHILD of the
  current run (parentRunID = inv.RunID, inheriting caller + channel). A non-ok
  child status is surfaced to the parent with the partial output. nil-safe: no
  PaletteSource or empty palette → no delegation tools (unchanged behavior).
- executor.go: call it right after building the low-level toolbox.

Tests: the model calls skill__helper → routed through Palette with the right
name/caller/inputs/parent; nil palette → run still works.

Deferred to C0b (the remaining run.Ports executor wiring): Critic (soft-timeout
monitor + deadline binding + steer), Delivery (output egress for surfaces that
need executor-side delivery), Checkpointer (needs a majordomo message-history
hook to snapshot resumable state). The `.agent run` canary delivers its returned
Result.Output itself, so these aren't on its critical path.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-27 09:28:01 -04:00
steve e37cf415de ci(gadfly): emit findings to gadfly-reports + bump image to sha-d7f364d
executus CI / test (push) Failing after 2m40s
Adds GADFLY_FINDINGS_URL / GADFLY_FINDINGS_TOKEN (user-scope secrets) so each review POSTs its run + findings to the gadfly-reports store, and bumps the pinned gadfly image to sha-d7f364d (the build carrying the findings-emit). Advisory only — emit failures never affect the review.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-27 09:12:46 -04:00
steve a87e7d2c72 fix: address verified gadfly P5 findings (canary robustness)
executus CI / test (pull_request) Failing after 3s
executus CI / test (push) Failing after 1m9s
All 3 cloud models converged (all "minor" — example code, no blocking):
- Consolidate: a model whose every lens errored now reads "review incomplete",
  not a misleading "no issues found" (all 3 models). + test.
- Consolidate: swarm-cancelled (unattributed) cells now surface a "swarm
  cancelled — N cell(s) did not run" banner instead of vanishing (all 3). + test.
- main: io.ReadAll(os.Stdin) error is surfaced (all 3); a TTY stdin no longer
  hangs forever (TTY guard, minimax).
- providerOf: a bare tier name now keys its own PerKey bucket instead of all
  bare tiers collapsing onto "tier" (minimax, glm-5.2) — distinct tiers throttle
  independently.
- Review doc reworded (the closure, not fanout, carries per-cell errors).

Left as documented example-scope behavior: no per-cell timeout (caller supplies
ctx), unknown-severity → lowest rank (no crash).

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-27 00:34:01 -04:00
steve ea9475da54 P5: light-tier canary — gadfly-shaped reviewer on executus core
executus CI / test (pull_request) Failing after 1m5s
Adversarial Review (Gadfly) / review (pull_request) Successful in 8m18s
examples/reviewer proves the core is sufficient for a static-binary light host
(gadfly's shape) with NO batteries:
- config.Env + model.Configure  -> env-driven model fleet + tier overrides
- model.ParseModelForContext    -> tier resolution + failover
- fanout.Run (PerKey caps)      -> N models x M lenses swarm, per-provider bound
- model.GenerateWith[T]         -> structured findings per (model, lens) cell
- Consolidate                   -> one verdict-led report section per model

Hermetic test runs the full 2x3 swarm against majordomo's fake provider and
asserts the consolidated verdicts. A go list -deps CI check asserts the canary
imports ZERO batteries (the light-tier invariant) — gadfly's go.sum stays free
of gorm/redis/discordgo/sqlite. README + docs updated.

This is the canary; migrating the LIVE gadfly repo onto executus core is a
follow-up (kept separate to not destabilize the active reviewer).

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-27 00:22:02 -04:00
23 changed files with 1694 additions and 93 deletions
+22 -47
View File
@@ -1,11 +1,8 @@
# Gadfly — agentic adversarial PR reviewer (https://gitea.stevedudenhoeffer.com/steve/gadfly).
#
# Runs the published Gadfly image (pinned to an immutable :sha- tag — act_runner
# caches :latest, and this build is what carries foreman provider-type support)
# as a specialist swarm and posts
# ONE consolidated review comment as gitea-actions. Advisory only — never blocks a
# merge. This reviews executus PRs with 3 ollama-cloud models (3-lens suite). Gadfly
# is a simple system — findings are advisory; always double-check before acting.
# Gadfly adversarial review — subscribes to steve/gadfly's reusable workflow and
# INHERITS its default swarm. This stub holds only the triggers, the actor gate,
# secret forwarding, and the allow-list; the swarm config (models, lenses,
# concurrency, timeouts) lives centrally in gadfly's review-reusable.yml so it is
# tuned in ONE place. Advisory only — never blocks a merge.
name: Adversarial Review (Gadfly)
@@ -32,48 +29,26 @@ concurrency:
jobs:
review:
# Security: only trusted users may trigger a secret-bearing run via a PR
# comment (pull_request + workflow_dispatch are already trusted). Mirrors
# GADFLY_ALLOWED_USERS, the in-container belt-and-suspenders check.
# comment (pull_request + workflow_dispatch are already trusted). Mirrors the
# allowed_users input below (the in-container belt-and-suspenders check) — both
# lists must stay in sync; a workflow if: can't read a workflow_call input.
if: >-
github.event_name != 'issue_comment'
|| (github.event.issue.pull_request
&& (github.actor == 'steve'
|| github.actor == 'fizi'
|| github.actor == 'dazed'))
runs-on: ubuntu-latest
# 3 cloud models, all concurrent, 3-lens suite. ~12 min typical.
timeout-minutes: 30
steps:
- uses: docker://gitea.stevedudenhoeffer.com/steve/gadfly:sha-d0de034
env:
GITEA_API: ${{ github.server_url }}/api/v1/repos/${{ github.repository }}
GITEA_TOKEN: ${{ secrets.GITEA_TOKEN }}
OLLAMA_CLOUD_API_KEY: ${{ secrets.OLLAMA_CLOUD_API_KEY }}
# executus uses CLOUD MODELS ONLY. The local Macs (m1/m5) were dropped:
# on a P2-review measurement they took 2629 min (with lens timeouts)
# and contributed ZERO real findings — the two cloud models found every
# genuine bug in 612 min. Cloud-only is faster AND higher-signal.
# 3 cloud models. Concurrency now lives in the LENSES, not the models:
# one model runs at a time (PROVIDER_CONCURRENCY=1) with its 3 lenses
# concurrent (PROVIDER_LENS_CONCURRENCY=3). So the first model's
# comment lands sooner and each model finishes a bit faster, at the
# cost of the other two models' comments arriving in series after it.
GADFLY_MODELS: "minimax-m3:cloud,deepseek-v4-flash:cloud,glm-5.2:cloud"
GADFLY_PROVIDER_CONCURRENCY: "ollama-cloud=1"
GADFLY_PROVIDER_LENS_CONCURRENCY: "ollama-cloud=3"
# Default => the 3-lens suite (security, correctness, error-handling).
# Set the repo var GADFLY_SPECIALISTS to override (csv / "all" / "auto").
GADFLY_SPECIALISTS: ${{ vars.GADFLY_SPECIALISTS || 'security,correctness,error-handling' }}
# Per-lens deadline + bounded steps so the slow local models stay sane.
GADFLY_TIMEOUT_SECS: "600"
GADFLY_MAX_STEPS: "14"
# Allow-list for the comment trigger (mirrors the job-level if: guard).
GADFLY_ALLOWED_USERS: "steve,fizi,dazed"
# --- event context (leave as-is) ---
EVENT_NAME: ${{ github.event_name }}
PR: ${{ github.event.pull_request.number || github.event.issue.number || github.event.inputs.pr_number }}
PR_BRANCH: ${{ github.head_ref }}
IS_DRAFT: ${{ github.event.pull_request.draft }}
COMMENT_BODY: ${{ github.event.comment.body }}
COMMENT_ID: ${{ github.event.comment.id }}
ACTOR: ${{ github.actor }}
# Tracks gadfly's v1 release tag — a curated pointer re-moved on each release
# (unlike @main, which moves on every push). Central swarm tuning propagates
# here automatically; the tradeoff vs a full sha pin is that v1 is mutable.
uses: steve/gadfly/.gitea/workflows/review-reusable.yml@v1
# Least privilege: forward only the review secrets (not `secrets: inherit`,
# which would expose every repo secret). GITEA_TOKEN is the automatic token.
secrets:
OLLAMA_CLOUD_API_KEY: ${{ secrets.OLLAMA_CLOUD_API_KEY }}
CLAUDE_CODE_OAUTH_TOKEN: ${{ secrets.CLAUDE_CODE_OAUTH_TOKEN }}
GADFLY_FINDINGS_URL: ${{ secrets.GADFLY_FINDINGS_URL }}
GADFLY_FINDINGS_TOKEN: ${{ secrets.GADFLY_FINDINGS_TOKEN }}
with:
# Consumer-specific allow-list; everything else is inherited.
allowed_users: "steve,fizi,dazed"
+11
View File
@@ -104,6 +104,17 @@ jobs:
fi
echo "OK: core go.sum is free of host/DB dependencies."
- name: Light-tier canary imports no battery
run: |
# examples/reviewer is gadfly's shape on the CORE only. If it ever
# pulls in a battery (audit/budget/persona/skill/critic/schedule/
# checkpoint/contrib), the light path has regressed.
LEAK=$(go list -deps ./examples/reviewer/... | grep -E 'executus/(audit|budget|persona|skill|critic|schedule|checkpoint|contrib)' || true)
if [ -n "$LEAK" ]; then
echo "ERROR: light-tier canary pulled in a battery:"; echo "$LEAK"; exit 1
fi
echo "OK: examples/reviewer is core-only."
- name: contrib/store (nested SQLite module — isolated from core)
run: |
# contrib/store is a SEPARATE module carrying modernc.org/sqlite; the
+5 -4
View File
@@ -47,9 +47,10 @@ CORE (majordomo + stdlib):
toolbox + majordomo loop + compaction +
run-bounding (V10 detached timeout) + step/
audit observers + Budget gate; RunnableAgent
DTO + nil-safe run.Ports. Follow-ups: wire
Critic/Checkpointer/PaletteSource/Delivery,
Phases, and the no-tools direct path [P2]
DTO + nil-safe run.Ports. Palette delegation +
Critic (monitor/deadline/steer) + Delivery
WIRED. Follow-ups: Checkpointer (needs a
majordomo msg-history hook), Phases [C0c]
dispatchguard/ loop/depth/fan-out caps [P0 ✓]
pendingattach/ attachment dedupe [P0 ✓]
tool/ registry + 3-stage permissions + ssrf [P1 ✓]
@@ -115,7 +116,7 @@ repackaging.
P0 module + zero-coupling moves + core seams (this) → P1 tool registry + model →
P2 run kernel + Ports inversion → P3 generic tools + defaults → P4 persona/skill
redesign + batteries + SQLite store → P5 gadfly on core (light-tier canary) → P6
redesign + batteries + SQLite store → P5 gadfly-on-core canary (examples/reviewer ✓) → P6
rewire mort + tag v0.1.0. The mort-side rewrite reuses mort's existing
`mort_*_adapters.go` wall as the host adapter layer.
+3
View File
@@ -48,6 +48,9 @@ bot) — mort and gadfly are the first two consumers (heavy and light). See
- `config/`, `deliver/`, `identity/` — host seams (config / output / identity),
each with a shipped default.
- `dispatchguard/`, `pendingattach/` — run-safety primitives.
- `examples/reviewer` — a **gadfly-shaped PR reviewer on the core only** (env-config
model fleet → `fanout` N×M swarm → `model.GenerateWith[T]` structured findings →
consolidation), the light-tier canary; CI asserts it pulls in no battery.
## Design
+46 -10
View File
@@ -10,13 +10,17 @@
// Mort plugs its LLM critic-agent in as an Escalator; ExtendOnce is the
// zero-dependency default.
//
// NOTE: the executor's call into run.Ports.Critic is a P2 follow-up; this
// battery provides the seam + impl ahead of that wiring.
// The executor wires run.Ports.Critic (C0b): it feeds the handle activity,
// binds the run context to its extendable Deadline, drains its Steer, and polls
// MaxSteps each step so an Escalator can also raise a long run's step ceiling
// (Decision.RaiseStepsBy).
package critic
import (
"context"
"errors"
"log/slog"
"math"
"sync"
"time"
@@ -36,10 +40,11 @@ type Progress struct {
// Decision is the Escalator's verdict for a stalled run. Zero value = do
// nothing (let the hard backstop eventually kill a truly hung run).
type Decision struct {
Nudge []llm.Message // injected before the agent's next turn (a steer)
ExtendBy time.Duration // push the hard deadline out by this much
Kill bool // cancel the run now
KillReason string
Nudge []llm.Message // injected before the agent's next turn (a steer)
ExtendBy time.Duration // push the hard deadline out by this much
RaiseStepsBy int // raise the run's tool-dispatch step ceiling by this
Kill bool // cancel the run now
KillReason string
}
// Escalator decides what to do when a run crosses its soft timeout. It is
@@ -136,6 +141,7 @@ func (s *System) Monitor(ctx context.Context, info run.RunInfo, softTimeout time
now: s.now,
lastActivity: now,
deadline: now.Add(time.Duration(float64(softTimeout) * s.backstopMul)),
maxSteps: info.MaxIterations, // base ceiling; an Escalator may RaiseStepsBy
stopCh: make(chan struct{}),
}
go h.watch(ctx, check)
@@ -155,13 +161,17 @@ type handle struct {
deadline time.Time
steer []llm.Message
iterations int
maxSteps int // current tool-dispatch ceiling (base MaxIterations, raised by RaiseStepsBy)
lastTool string
killed bool // sticky: once an Escalator kills, no later decision un-kills it
killed bool // sticky: once an Escalator kills, no later decision un-kills it
killCause error // non-nil once killed; surfaced via KillCause for "killed" status
stopped bool
stopCh chan struct{}
}
func (h *handle) RecordStep(iter int) {
func (h *handle) RecordStep(iter int, _ *llm.Response) {
// This battery's Progress tracks iteration count + activity, not per-step
// payload, so the response is unused here; a richer Escalator could record it.
h.mu.Lock()
h.iterations = iter
h.lastActivity = h.now()
@@ -192,6 +202,18 @@ func (h *handle) Deadline() time.Time {
return h.deadline
}
func (h *handle) MaxSteps() int {
h.mu.Lock()
defer h.mu.Unlock()
return h.maxSteps
}
func (h *handle) KillCause() error {
h.mu.Lock()
defer h.mu.Unlock()
return h.killCause
}
func (h *handle) Stop() {
h.mu.Lock()
if !h.stopped {
@@ -254,8 +276,13 @@ func (h *handle) tick(ctx context.Context) {
}
if d.Kill {
h.killed = true
h.deadline = h.now() // immediate hard deadline → executor cancels
return // ignore any Nudge/ExtendBy paired with a Kill
reason := d.KillReason
if reason == "" {
reason = "critic killed the run"
}
h.killCause = errors.New(reason) // surfaced via KillCause → "killed" status
h.deadline = h.now() // immediate hard deadline → executor cancels
return // ignore any Nudge/ExtendBy paired with a Kill
}
if len(d.Nudge) > 0 {
h.steer = append(h.steer, d.Nudge...)
@@ -263,4 +290,13 @@ func (h *handle) tick(ctx context.Context) {
if d.ExtendBy > 0 {
h.deadline = h.deadline.Add(d.ExtendBy)
}
if d.RaiseStepsBy > 0 {
// Overflow-safe: a buggy Escalator returning a huge delta must not wrap
// maxSteps negative (which the executor would read as "defer to base").
if d.RaiseStepsBy > math.MaxInt-h.maxSteps {
h.maxSteps = math.MaxInt
} else {
h.maxSteps += d.RaiseStepsBy
}
}
}
+1 -1
View File
@@ -51,7 +51,7 @@ func TestMonitorEscalatesOncePerIdlePeriodAndExtends(t *testing.T) {
t.Error("deadline should have been extended past the original")
}
// A fresh step re-arms; another idle period escalates again.
h.RecordStep(1)
h.RecordStep(1, nil)
time.Sleep(60 * time.Millisecond)
mu.Lock()
c2 := calls
+38
View File
@@ -0,0 +1,38 @@
# examples/reviewer — the light-tier canary
A **gadfly-shaped adversarial PR reviewer built on the executus core only** — no
batteries, no database, no host adapters. It exists to prove that the core is
sufficient for a static-binary light host (gadfly's shape), and that such a host
keeps a `go.sum` free of `gorm`/`redis`/`discordgo`/`sqlite`.
What it exercises, all from core:
| Concern | executus core piece |
|---|---|
| Env-driven model fleet + tier overrides | `config.Env` + `model.Configure` |
| Tier resolution + failover | `model.ParseModelForContext` |
| N models × M lenses swarm | `fanout.Run` (with `PerKey` per-provider caps) |
| Structured findings per cell | `model.GenerateWith[T]` |
| One report section per model, worst-verdict-led | `Consolidate` (local) |
## Run
```sh
REVIEWER_MODELS=fast,thinking \
ANTHROPIC_API_KEY=sk-... \
go run ./examples/reviewer -diff "$(git diff HEAD~1)"
```
Config (all optional, `REVIEWER_`-prefixed env):
- `REVIEWER_MODELS` — csv of tier names / `provider/model` specs (default `fast`)
- `REVIEWER_MODEL_TIER_<NAME>` — override a tier's resolved spec
- `REVIEWER_MAX_CONCURRENT` — total in-flight swarm cells (default 6)
- `REVIEWER_PROVIDER_CONCURRENCY` — per-provider cap (default 3)
## Test
`reviewer_test.go` runs the whole swarm against majordomo's fake provider
(hermetic, no network) and asserts the consolidated verdicts. A `go list -deps`
check in CI confirms the package pulls in no battery and no DB driver — the
light-tier invariant.
+110
View File
@@ -0,0 +1,110 @@
package main
import (
"context"
"flag"
"fmt"
"io"
"os"
"strings"
"gitea.stevedudenhoeffer.com/steve/executus/config"
"gitea.stevedudenhoeffer.com/steve/executus/fanout"
"gitea.stevedudenhoeffer.com/steve/executus/model"
)
// DefaultLenses is the canary's review suite (mirrors gadfly's default).
var DefaultLenses = []Lens{
{Name: "security", Focus: "auth, injection, secret leakage, unsafe deserialization, SSRF."},
{Name: "correctness", Focus: "logic errors, broken invariants, off-by-one, contract violations."},
{Name: "error-handling", Focus: "swallowed errors, missing timeouts, races, unhandled edge cases."},
}
// Reviewer is configured entirely from the environment (the GADFLY_*-style light
// host): REVIEWER_MODELS (csv of tier/spec), REVIEWER_MODEL_TIER_<NAME> overrides,
// REVIEWER_MAX_CONCURRENT, REVIEWER_PROVIDER_CONCURRENCY. The diff is read from
// -diff or stdin.
//
// REVIEWER_MODELS=fast,thinking ANTHROPIC_API_KEY=... go run ./examples/reviewer < my.diff
func main() {
cfg := config.Env("REVIEWER_")
// Tier table from env, with code defaults.
model.Configure(cfg, map[string]string{
"fast": "anthropic/claude-haiku-4-5",
"thinking": "anthropic/claude-opus-4-8",
}, 0)
fleet := splitCSV(cfg.String("models", "fast"))
maxConc := cfg.Int("max_concurrent", 6)
perProvider := cfg.Int("provider_concurrency", 3)
diffFlag := flag.String("diff", "", "diff text to review; reads stdin when empty")
flag.Parse()
diff := *diffFlag
if strings.TrimSpace(diff) == "" {
// Guard against blocking forever on an interactive TTY (no piped input).
if fi, _ := os.Stdin.Stat(); fi != nil && fi.Mode()&os.ModeCharDevice != 0 {
fmt.Fprintln(os.Stderr, "reviewer: no diff (pass -diff or pipe one on stdin)")
os.Exit(2)
}
b, err := io.ReadAll(os.Stdin)
if err != nil {
fmt.Fprintf(os.Stderr, "reviewer: reading stdin: %v\n", err)
os.Exit(2)
}
diff = string(b)
}
if strings.TrimSpace(diff) == "" {
fmt.Fprintln(os.Stderr, "reviewer: no diff (pass -diff or pipe one on stdin)")
os.Exit(2)
}
ctx := context.Background()
var models []NamedModel
for _, spec := range fleet {
_, m, err := model.ParseModelForContext(ctx, spec)
if err != nil {
fmt.Fprintf(os.Stderr, "reviewer: resolve model %q: %v\n", spec, err)
os.Exit(1)
}
models = append(models, NamedModel{Name: spec, Provider: providerOf(spec), Model: m})
}
results := Review(ctx, models, DefaultLenses, diff, fanout.Options[cell]{
MaxConcurrent: maxConc,
PerKey: perKeyCaps(models, perProvider),
})
fmt.Print(Consolidate(results))
}
func splitCSV(s string) []string {
var out []string
for _, p := range strings.Split(s, ",") {
if p = strings.TrimSpace(p); p != "" {
out = append(out, p)
}
}
return out
}
// providerOf returns a model spec's provider (the first path segment, e.g.
// "anthropic/claude-…" → "anthropic"; a bare tier name → itself).
func providerOf(spec string) string {
if i := strings.IndexByte(spec, '/'); i > 0 {
return spec[:i]
}
return spec // bare tier name → its own bucket (don't collapse distinct tiers)
}
// perKeyCaps builds the PerKey map: each distinct provider capped at perProvider.
func perKeyCaps(models []NamedModel, perProvider int) map[string]int {
if perProvider <= 0 {
return nil
}
caps := map[string]int{}
for _, m := range models {
caps[m.Provider] = perProvider
}
return caps
}
+204
View File
@@ -0,0 +1,204 @@
// Command reviewer is executus's light-tier CANARY: a gadfly-shaped adversarial
// PR reviewer built on the executus CORE ONLY — no batteries, no DB, no host.
// It proves the core is sufficient for a static-binary host like gadfly:
//
// - config.Env → env-driven model fleet + concurrency (GADFLY_*-style)
// - model.Configure/... → tier resolution + failover over majordomo
// - fanout.Run → the N-models × M-lenses swarm, with per-provider caps
// - model.GenerateWith[T] → structured findings per (model, lens)
// - consolidation → one report section per model, worst-verdict-led
//
// The whole thing imports only executus core packages, so a binary built from it
// keeps a go.sum free of gorm/redis/discordgo/sqlite — the light-tier invariant.
//
// See reviewer_test.go for the hermetic swarm test (majordomo's fake provider).
package main
import (
"context"
"fmt"
"sort"
"strings"
"gitea.stevedudenhoeffer.com/steve/majordomo/llm"
"gitea.stevedudenhoeffer.com/steve/executus/fanout"
"gitea.stevedudenhoeffer.com/steve/executus/model"
)
// Severity orders findings; the rank drives a model's worst-verdict header.
type Severity string
const (
SevTrivial Severity = "trivial"
SevSmall Severity = "small"
SevMedium Severity = "medium"
SevHigh Severity = "high"
SevCritical Severity = "critical"
)
func severityRank(s Severity) int {
switch s {
case SevCritical:
return 4
case SevHigh:
return 3
case SevMedium:
return 2
case SevSmall:
return 1
default:
return 0
}
}
// Finding is one issue a lens reports. It is the structured-output schema the
// model must satisfy (majordomo derives the JSON schema from this struct).
type Finding struct {
Severity Severity `json:"severity" jsonschema:"enum=trivial,enum=small,enum=medium,enum=high,enum=critical"`
Title string `json:"title"`
Detail string `json:"detail"`
}
// lensReport is the per-(model,lens) structured response.
type lensReport struct {
Findings []Finding `json:"findings"`
}
// Lens is one review dimension (security / correctness / …).
type Lens struct {
Name string
Focus string // appended to the base system prompt
}
// NamedModel is a resolved model plus the label + provider used for fan-out
// keying (per-provider concurrency) and reporting.
type NamedModel struct {
Name string // display label (the tier/spec the host configured)
Provider string // fan-out key for PerKey concurrency (e.g. "ollama-cloud")
Model llm.Model
}
// LensResult is one swarm cell's outcome.
type LensResult struct {
Model string
Lens string
Findings []Finding
Err error
}
const baseSystemPrompt = "You are an adversarial code reviewer. Review the diff for real, verifiable problems only — no style nits. Return ONLY JSON matching the schema. Report nothing if you find nothing."
// Review runs every (model × lens) cell of the swarm concurrently, bounded by
// opts (total + per-provider caps), and returns one LensResult per cell. A cell
// whose model call fails carries the error in LensResult.Err — one bad cell
// never aborts the swarm (the closure embeds per-cell errors in LensResult.Err).
func Review(ctx context.Context, models []NamedModel, lenses []Lens, diff string, opts fanout.Options[cell]) []LensResult {
cells := make([]cell, 0, len(models)*len(lenses))
for _, m := range models {
for _, l := range lenses {
cells = append(cells, cell{model: m, lens: l})
}
}
// Key each cell by its provider so PerKey throttles per backend (the
// GADFLY_PROVIDER_CONCURRENCY analogue).
if opts.Key == nil {
opts.Key = func(c cell) string { return c.model.Provider }
}
results := fanout.Run(ctx, cells, opts, func(ctx context.Context, c cell) (LensResult, error) {
sys := baseSystemPrompt
if c.lens.Focus != "" {
sys += "\n\nLens — " + c.lens.Name + ": " + c.lens.Focus
}
msgs := []llm.Message{{Role: llm.RoleUser, Parts: []llm.Part{llm.Text("Diff under review:\n" + diff)}}}
rep, err := model.GenerateWith[lensReport](ctx, c.model.Model, sys, msgs)
lr := LensResult{Model: c.model.Name, Lens: c.lens.Name, Findings: rep.Findings, Err: err}
// Return the value either way (err embedded) so every cell reports.
return lr, nil
})
out := make([]LensResult, 0, len(results))
for _, r := range results {
if r.Err != nil { // a swarm-level error (ctx cancel) with no value
out = append(out, LensResult{Err: r.Err})
continue
}
out = append(out, r.Value)
}
return out
}
// cell is one (model, lens) swarm task.
type cell struct {
model NamedModel
lens Lens
}
// Consolidate renders the swarm's results into one report: a section per model,
// each led by that model's worst finding severity, mirroring gadfly's
// one-comment-per-model output.
func Consolidate(results []LensResult) string {
byModel := map[string][]LensResult{}
var order []string
aborted := 0 // cells dropped before running (swarm cancelled) — no model attribution
for _, r := range results {
if r.Model == "" {
if r.Err != nil {
aborted++
}
continue
}
if _, ok := byModel[r.Model]; !ok {
order = append(order, r.Model)
}
byModel[r.Model] = append(byModel[r.Model], r)
}
sort.Strings(order)
var b strings.Builder
if aborted > 0 {
fmt.Fprintf(&b, "> ⚠ swarm cancelled — %d cell(s) did not run; results below are partial.\n\n", aborted)
}
for _, m := range order {
rs := byModel[m]
var all []Finding
worst := -1
errored := 0
for _, r := range rs {
if r.Err != nil {
errored++
continue
}
all = append(all, r.Findings...)
for _, f := range r.Findings {
if severityRank(f.Severity) > worst {
worst = severityRank(f.Severity)
}
}
}
// A model whose every lens errored produced NO data — saying "no issues
// found" would be misleading, so it gets its own verdict.
successful := len(rs) - errored
verdict := "no issues found"
switch {
case successful == 0 && errored > 0:
verdict = "review incomplete"
case worst >= severityRank(SevHigh):
verdict = "blocking issues found"
case worst >= 0:
verdict = "minor issues"
}
fmt.Fprintf(&b, "## %s — %s", m, verdict)
if errored > 0 {
fmt.Fprintf(&b, " (⚠ %d lens(es) errored)", errored)
}
b.WriteString("\n")
sort.SliceStable(all, func(i, j int) bool {
return severityRank(all[i].Severity) > severityRank(all[j].Severity)
})
for _, f := range all {
fmt.Fprintf(&b, "- [%s] %s — %s\n", f.Severity, f.Title, f.Detail)
}
b.WriteString("\n")
}
return b.String()
}
+128
View File
@@ -0,0 +1,128 @@
package main
import (
"context"
"strings"
"testing"
"gitea.stevedudenhoeffer.com/steve/majordomo/provider/fake"
"gitea.stevedudenhoeffer.com/steve/executus/fanout"
)
// TestReviewSwarm proves the light-tier path end-to-end against the fake
// provider: a 2-model × 3-lens swarm runs, structured findings parse, and
// consolidation produces one verdict-led section per model — no batteries, no
// network.
func TestReviewSwarm(t *testing.T) {
fp := fake.New("fakeprov")
// Model "hot" reports a high-severity finding on every lens; "cold" reports
// nothing. Each model is called once per lens (3×), so enqueue 3 each.
hot := `{"findings":[{"severity":"high","title":"SQL injection","detail":"unsanitized id in query"}]}`
cold := `{"findings":[]}`
for i := 0; i < 3; i++ {
fp.Enqueue("hot", fake.Reply(hot))
fp.Enqueue("cold", fake.Reply(cold))
}
hotM, err := fp.Model("hot")
if err != nil {
t.Fatal(err)
}
coldM, err := fp.Model("cold")
if err != nil {
t.Fatal(err)
}
models := []NamedModel{
{Name: "hot", Provider: "fakeprov", Model: hotM},
{Name: "cold", Provider: "fakeprov", Model: coldM},
}
lenses := []Lens{{Name: "security"}, {Name: "correctness"}, {Name: "error-handling"}}
results := Review(context.Background(), models, lenses, "some diff",
fanout.Options[cell]{MaxConcurrent: 6, PerKey: map[string]int{"fakeprov": 3}})
// 2 models × 3 lenses = 6 cells, all successful.
if len(results) != 6 {
t.Fatalf("got %d cells, want 6", len(results))
}
var hotFindings, coldFindings, errs int
for _, r := range results {
if r.Err != nil {
errs++
continue
}
switch r.Model {
case "hot":
hotFindings += len(r.Findings)
case "cold":
coldFindings += len(r.Findings)
}
}
if errs != 0 {
t.Errorf("expected no cell errors, got %d", errs)
}
if hotFindings != 3 { // one per lens
t.Errorf("hot model findings = %d, want 3", hotFindings)
}
if coldFindings != 0 {
t.Errorf("cold model findings = %d, want 0", coldFindings)
}
report := Consolidate(results)
if !strings.Contains(report, "hot — blocking issues found") {
t.Errorf("hot section should lead with a blocking verdict:\n%s", report)
}
if !strings.Contains(report, "cold — no issues found") {
t.Errorf("cold section should report no issues:\n%s", report)
}
if !strings.Contains(report, "SQL injection") {
t.Errorf("report should surface the finding:\n%s", report)
}
}
// TestConsolidateVerdicts checks the worst-severity-led header logic.
func TestConsolidateVerdicts(t *testing.T) {
got := Consolidate([]LensResult{
{Model: "m", Lens: "a", Findings: []Finding{{Severity: SevSmall, Title: "x"}}},
{Model: "m", Lens: "b", Findings: []Finding{{Severity: SevMedium, Title: "y"}}},
})
if !strings.Contains(got, "m — minor issues") {
t.Errorf("medium-max should be 'minor issues', got:\n%s", got)
}
// An errored lens is surfaced in the header.
got = Consolidate([]LensResult{
{Model: "m", Lens: "a", Findings: []Finding{{Severity: SevCritical, Title: "boom"}}},
{Model: "m", Lens: "b", Err: context.DeadlineExceeded},
})
if !strings.Contains(got, "blocking issues found") || !strings.Contains(got, "errored") {
t.Errorf("critical + errored lens header wrong:\n%s", got)
}
}
// TestConsolidateAllErrored: a model whose every lens errored must NOT be
// labelled "no issues found" (the gadfly P5 finding).
func TestConsolidateAllErrored(t *testing.T) {
got := Consolidate([]LensResult{
{Model: "m", Lens: "a", Err: context.DeadlineExceeded},
{Model: "m", Lens: "b", Err: context.DeadlineExceeded},
})
if !strings.Contains(got, "m — review incomplete") {
t.Errorf("all-errored model should be 'review incomplete', got:\n%s", got)
}
if strings.Contains(got, "no issues found") {
t.Errorf("all-errored model must not say 'no issues found':\n%s", got)
}
}
// TestConsolidateSwarmCancelled: dropped (unattributed) cells surface a banner.
func TestConsolidateSwarmCancelled(t *testing.T) {
got := Consolidate([]LensResult{
{Err: context.Canceled}, // dropped cell, no model
{Model: "m", Lens: "a", Findings: []Finding{{Severity: SevSmall, Title: "x"}}},
})
if !strings.Contains(got, "swarm cancelled") {
t.Errorf("dropped cells should surface a cancellation banner:\n%s", got)
}
}
+1
View File
@@ -125,6 +125,7 @@ google.golang.org/protobuf v1.23.1-0.20200526195155-81db48ad09cc/go.mod h1:EGpAD
google.golang.org/protobuf v1.25.0/go.mod h1:9JNX74DMeImyA3h4bdi1ymwjUzf21/xIlbajtzgsN7c=
google.golang.org/protobuf v1.34.2 h1:6xV6lTsCfpGD21XK49h7MhtcApnLqkfYgPcdHftf6hg=
google.golang.org/protobuf v1.34.2/go.mod h1:qYOHts0dSfpeUzUFpOMr/WGzszTmLH+DiWniOlNbLDw=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
+124
View File
@@ -0,0 +1,124 @@
package run
import (
"context"
"fmt"
"time"
"gitea.stevedudenhoeffer.com/steve/majordomo/agent"
"gitea.stevedudenhoeffer.com/steve/majordomo/llm"
)
// criticDeadlineCheck is how often the deadline-watch goroutine polls the
// critic's hard deadline. Small relative to any realistic soft timeout.
const criticDeadlineCheck = time.Second
// criticBinding wires a CriticHandle into a run: the executor forwards activity
// (steps + tool starts) to it, binds the run's hard cancellation to the critic's
// extendable deadline, and exposes the critic's Steer messages as an agent
// RunOption. All methods are nil-safe so the executor can call them
// unconditionally when no critic is configured.
type criticBinding struct {
h CriticHandle
}
// startCritic begins critic monitoring for this run when one is configured and
// the agent enables it. It launches a goroutine that cancels runCtx (via
// cancelCause) the moment the critic's hard deadline passes — the critic may
// extend that deadline, so a healthy-but-slow run is given room while a hung one
// is killed. When the deadline passes because the critic KILLED the run
// (KillCause() != nil), the cancellation cause is ErrCriticKill (→ status
// "killed"); when the backstop simply expired, it is context.DeadlineExceeded (→
// "timeout"). Returns (nil, no-op stop) when there is no critic. The caller MUST
// defer the returned stop.
func (e *Executor) startCritic(runCtx context.Context, cancelCause context.CancelCauseFunc, ra RunnableAgent, info RunInfo) (*criticBinding, func()) {
noop := func() {}
if e.cfg.Ports.Critic == nil || !ra.Critic.Enabled {
return nil, noop
}
soft := e.cfg.Defaults.CriticSoftTimeout
if soft <= 0 {
soft = 90 * time.Second // defensive: withFallbacks normally guarantees >0
}
h := e.cfg.Ports.Critic.Monitor(runCtx, info, soft)
if h == nil {
return nil, noop
}
done := make(chan struct{})
go func() {
// A host CriticHandle.Deadline() that panics must not crash the process
// (this runs on its own goroutine, so the executor's top-level recover
// can't catch it). Log-free best-effort: just stop watching.
defer func() { _ = recover() }()
t := time.NewTicker(criticDeadlineCheck)
defer t.Stop()
for {
select {
case <-done:
return
case <-runCtx.Done():
return
case <-t.C:
// A zero deadline = no hard cap (not yet set); otherwise cancel
// once we're at or past it, distinguishing an explicit kill from a
// natural backstop expiry so the run gets the right status.
if d := h.Deadline(); !d.IsZero() && !time.Now().Before(d) {
if cause := h.KillCause(); cause != nil {
cancelCause(fmt.Errorf("%w: %s", ErrCriticKill, cause.Error()))
} else {
cancelCause(context.DeadlineExceeded)
}
return
}
}
}
}()
return &criticBinding{h: h}, func() {
close(done)
h.Stop()
}
}
func (b *criticBinding) recordStep(iter int, resp *llm.Response) {
if b != nil {
b.h.RecordStep(iter, resp)
}
}
// recordToolStart forwards a tool call to the critic. NOTE: majordomo's step
// observer only fires AFTER an iteration completes, so this currently lands
// post-tool, not at dispatch — the activity clock is refreshed once per
// iteration, not mid-tool. A single very long tool call (e.g. a 30-min render)
// therefore won't refresh the clock until it returns; a host that runs such
// tools should feed interim progress to its Critic (mort's InstallProgressBridge
// pattern). A true pre-dispatch refresh needs a majordomo hook (follow-up).
func (b *criticBinding) recordToolStart(name, args string) {
if b != nil {
b.h.RecordToolStart(name, args)
}
}
// maxStepsOption returns the agent step-ceiling Option. With no critic it's a
// fixed WithMaxSteps(base); with a critic it's a DYNAMIC WithMaxStepsFunc that
// polls the handle each step (so the critic can raise a long run's budget),
// falling back to base when the handle defers (MaxSteps() <= 0).
func (b *criticBinding) maxStepsOption(base int) agent.Option {
if b == nil {
return agent.WithMaxSteps(base)
}
return agent.WithMaxStepsFunc(func() int {
if n := b.h.MaxSteps(); n > 0 {
return n
}
return base
})
}
// drainSteer returns the critic's queued steer messages (nil-safe), so the
// executor can merge them with the session steer mailbox into one WithSteer.
func (b *criticBinding) drainSteer() []llm.Message {
if b == nil {
return nil
}
return b.h.Steer()
}
+128
View File
@@ -0,0 +1,128 @@
package run_test
import (
"context"
"sync"
"testing"
"time"
"gitea.stevedudenhoeffer.com/steve/majordomo/llm"
"gitea.stevedudenhoeffer.com/steve/majordomo/provider/fake"
"gitea.stevedudenhoeffer.com/steve/executus/run"
"gitea.stevedudenhoeffer.com/steve/executus/tool"
)
type fakeCritic struct{ h *fakeCriticHandle }
func (c *fakeCritic) Monitor(_ context.Context, _ run.RunInfo, _ time.Duration) run.CriticHandle {
return c.h
}
type fakeCriticHandle struct {
mu sync.Mutex
steps, tools, stops int
steered int
maxSteps int // 0 => defer to the run's base MaxIterations
killCause error // non-nil simulates a critic kill
}
func (h *fakeCriticHandle) RecordStep(int, *llm.Response) { h.mu.Lock(); h.steps++; h.mu.Unlock() }
func (h *fakeCriticHandle) KillCause() error {
h.mu.Lock()
defer h.mu.Unlock()
return h.killCause
}
func (h *fakeCriticHandle) RecordToolStart(string, string) {
h.mu.Lock()
h.tools++
h.mu.Unlock()
}
func (h *fakeCriticHandle) Steer() []llm.Message { h.mu.Lock(); h.steered++; h.mu.Unlock(); return nil }
func (h *fakeCriticHandle) Deadline() time.Time { return time.Time{} } // no hard deadline
func (h *fakeCriticHandle) MaxSteps() int { h.mu.Lock(); defer h.mu.Unlock(); return h.maxSteps }
func (h *fakeCriticHandle) Stop() { h.mu.Lock(); h.stops++; h.mu.Unlock() }
// TestCriticRaisesStepCeiling: a critic returning a higher MaxSteps lets the agent
// run PAST its base MaxIterations (the dynamic step ceiling). With base=1 and no
// critic the run would hit ErrMaxSteps after the first tool-dispatch step; the
// critic raises it to 5 so the run completes.
func TestCriticRaisesStepCeiling(t *testing.T) {
h := &fakeCriticHandle{maxSteps: 5}
fp := fake.New("fake")
fp.Enqueue("m",
// two tool-call steps (unknown tool → tolerated error results), then answer
fake.ReplyWith(llm.Response{ToolCalls: []llm.ToolCall{{ID: "c1", Name: "noop", Arguments: []byte(`{}`)}}}),
fake.ReplyWith(llm.Response{ToolCalls: []llm.ToolCall{{ID: "c2", Name: "noop", Arguments: []byte(`{}`)}}}),
fake.Reply("done after 2 tool steps"),
)
m, _ := fp.Model("m")
ex := run.New(run.Config{
Registry: tool.NewRegistry(),
Models: func(ctx context.Context, _ string) (context.Context, llm.Model, error) { return ctx, m, nil },
Ports: run.Ports{Critic: &fakeCritic{h: h}},
// large soft timeout so the deadline-watch never interferes in the test
Defaults: run.Defaults{CriticSoftTimeout: time.Hour},
})
res := ex.Run(context.Background(),
run.RunnableAgent{Name: "x", ModelTier: "m", MaxIterations: 1, Critic: run.CriticConfig{Enabled: true}},
tool.Invocation{RunID: "r"}, "go")
if res.Err != nil {
t.Fatalf("critic raised the ceiling to 5, run should complete past base=1: %v", res.Err)
}
if res.Output != "done after 2 tool steps" {
t.Errorf("output = %q", res.Output)
}
}
// TestCriticWired: an agent with Critic.Enabled gets monitored — Monitor returns
// a handle the executor feeds (RecordStep), drains (Steer), and stops.
func TestCriticWired(t *testing.T) {
h := &fakeCriticHandle{}
fp := fake.New("fake")
fp.Enqueue("m", fake.Reply("done"))
m, _ := fp.Model("m")
ex := run.New(run.Config{
Registry: tool.NewRegistry(),
Models: func(ctx context.Context, _ string) (context.Context, llm.Model, error) { return ctx, m, nil },
Ports: run.Ports{Critic: &fakeCritic{h: h}},
})
res := ex.Run(context.Background(),
run.RunnableAgent{Name: "watched", ModelTier: "m", Critic: run.CriticConfig{Enabled: true}},
tool.Invocation{RunID: "r"}, "go")
if res.Err != nil {
t.Fatalf("run error: %v", res.Err)
}
h.mu.Lock()
defer h.mu.Unlock()
if h.steps < 1 {
t.Errorf("critic should have seen >=1 step, got %d", h.steps)
}
if h.steered < 1 {
t.Errorf("critic Steer should be drained at least once, got %d", h.steered)
}
if h.stops != 1 {
t.Errorf("critic Stop should be called exactly once, got %d", h.stops)
}
}
// TestCriticDisabledNotMonitored: Critic.Enabled=false → Monitor never called.
func TestCriticDisabledNotMonitored(t *testing.T) {
h := &fakeCriticHandle{}
fp := fake.New("fake")
fp.Enqueue("m", fake.Reply("done"))
m, _ := fp.Model("m")
ex := run.New(run.Config{
Registry: tool.NewRegistry(),
Models: func(ctx context.Context, _ string) (context.Context, llm.Model, error) { return ctx, m, nil },
Ports: run.Ports{Critic: &fakeCritic{h: h}},
})
ex.Run(context.Background(),
run.RunnableAgent{Name: "x", ModelTier: "m"}, // Critic.Enabled=false
tool.Invocation{RunID: "r"}, "go")
h.mu.Lock()
defer h.mu.Unlock()
if h.stops != 0 || h.steps != 0 {
t.Errorf("disabled critic should not be monitored: steps=%d stops=%d", h.steps, h.stops)
}
}
+114
View File
@@ -0,0 +1,114 @@
package run_test
import (
"context"
"errors"
"testing"
"gitea.stevedudenhoeffer.com/steve/majordomo/llm"
"gitea.stevedudenhoeffer.com/steve/majordomo/provider/fake"
"gitea.stevedudenhoeffer.com/steve/executus/deliver"
"gitea.stevedudenhoeffer.com/steve/executus/run"
"gitea.stevedudenhoeffer.com/steve/executus/tool"
)
type recordingDelivery struct {
target deliver.Target
output string
errored error
delivers int
}
func (d *recordingDelivery) Deliver(_ context.Context, t deliver.Target, output string, _ []deliver.Artifact) (string, error) {
d.target, d.output, d.delivers = t, output, d.delivers+1
return "msg-1", nil
}
func (d *recordingDelivery) DeliverError(_ context.Context, t deliver.Target, e error) error {
d.target, d.errored = t, e
return nil
}
func TestDeliveryWired(t *testing.T) {
d := &recordingDelivery{}
fp := fake.New("fake")
fp.Enqueue("m", fake.Reply("the output"))
m, _ := fp.Model("m")
ex := run.New(run.Config{
Registry: tool.NewRegistry(),
Models: func(ctx context.Context, _ string) (context.Context, llm.Model, error) { return ctx, m, nil },
Ports: run.Ports{Delivery: d},
})
// With a delivery target, the executor posts the output.
ex.Run(context.Background(),
run.RunnableAgent{Name: "x", ModelTier: "m"},
tool.Invocation{RunID: "r", DeliveryKind: "channel", DeliveryID: "chan-9"}, "go")
if d.delivers != 1 || d.output != "the output" || d.target.ID != "chan-9" || d.target.Kind != "channel" {
t.Fatalf("delivery wrong: %+v out=%q", d.target, d.output)
}
}
func TestNoDeliveryWithoutTarget(t *testing.T) {
d := &recordingDelivery{}
fp := fake.New("fake")
fp.Enqueue("m", fake.Reply("x"))
m, _ := fp.Model("m")
ex := run.New(run.Config{
Registry: tool.NewRegistry(),
Models: func(ctx context.Context, _ string) (context.Context, llm.Model, error) { return ctx, m, nil },
Ports: run.Ports{Delivery: d},
})
// No DeliveryID → executor delivers nothing (caller reads Result.Output).
ex.Run(context.Background(),
run.RunnableAgent{Name: "x", ModelTier: "m"},
tool.Invocation{RunID: "r"}, "go")
if d.delivers != 0 {
t.Errorf("no target should mean no delivery, got %d", d.delivers)
}
}
// TestNoDeliveryOnEarlyResolveError: an error BEFORE the run starts (model
// resolve) returns before delivery is reached — neither Deliver nor DeliverError
// fires. (Delivery covers run OUTCOMES, not pre-run setup failures.)
func TestNoDeliveryOnEarlyResolveError(t *testing.T) {
d := &recordingDelivery{}
ex := run.New(run.Config{
Registry: tool.NewRegistry(),
Models: func(ctx context.Context, _ string) (context.Context, llm.Model, error) {
return ctx, nil, errors.New("resolve boom")
},
Ports: run.Ports{Delivery: d},
})
ex.Run(context.Background(),
run.RunnableAgent{Name: "x", ModelTier: "m"},
tool.Invocation{RunID: "r", DeliveryKind: "channel", DeliveryID: "chan-9"}, "go")
if d.delivers != 0 || d.errored != nil {
t.Errorf("early resolve failure should neither Deliver nor DeliverError: delivers=%d errored=%v", d.delivers, d.errored)
}
}
// TestDeliverErrorOnRunFailure: an in-loop run failure (the model errors) routes
// through DeliverError with the run error.
func TestDeliverErrorOnRunFailure(t *testing.T) {
d := &recordingDelivery{}
fp := fake.New("fake")
fp.Enqueue("m", fake.Step{Err: errors.New("model boom")}) // model errors mid-run
m, _ := fp.Model("m")
ex := run.New(run.Config{
Registry: tool.NewRegistry(),
Models: func(ctx context.Context, _ string) (context.Context, llm.Model, error) { return ctx, m, nil },
Ports: run.Ports{Delivery: d},
})
res := ex.Run(context.Background(),
run.RunnableAgent{Name: "x", ModelTier: "m"},
tool.Invocation{RunID: "r", DeliveryKind: "channel", DeliveryID: "chan-9"}, "go")
if res.Err == nil {
t.Fatal("expected a run error")
}
if d.delivers != 0 {
t.Errorf("a failed run should not Deliver (success path), got %d", d.delivers)
}
if d.errored == nil || d.target.ID != "chan-9" {
t.Errorf("a failed run with a target should DeliverError to chan-9, got errored=%v target=%+v", d.errored, d.target)
}
}
+172 -22
View File
@@ -4,12 +4,14 @@ import (
"context"
"errors"
"fmt"
"strings"
"time"
"gitea.stevedudenhoeffer.com/steve/majordomo/agent"
"gitea.stevedudenhoeffer.com/steve/majordomo/llm"
"gitea.stevedudenhoeffer.com/steve/executus/compact"
"gitea.stevedudenhoeffer.com/steve/executus/deliver"
"gitea.stevedudenhoeffer.com/steve/executus/tool"
)
@@ -27,6 +29,7 @@ type Defaults struct {
MaxConsecutiveToolErrors int // loop guard; default 3
MaxSameToolCallRepeats int // retry-storm guard; default 3
CompactionThresholdRatio float64 // fraction of model context to compact at; default 0.7
CriticSoftTimeout time.Duration // idle window before the critic wakes; default 90s
}
func (d Defaults) withFallbacks() Defaults {
@@ -48,6 +51,9 @@ func (d Defaults) withFallbacks() Defaults {
if d.CompactionThresholdRatio <= 0 {
d.CompactionThresholdRatio = 0.7
}
if d.CriticSoftTimeout <= 0 {
d.CriticSoftTimeout = 90 * time.Second
}
return d
}
@@ -96,13 +102,26 @@ type Result struct {
Steps []tool.Step
Usage llm.Usage
Err error
// PostRunResult carries artifacts produced by a SessionToolFactory's PostRun
// hook (rendered images, files). nil when no factory was set or PostRun
// returned nil. The host delivers these (e.g. mort's chat API / Discord).
PostRunResult *tool.PostRunResult
}
// Run executes ra with the given invocation + input and returns the Result. It
// never propagates a panic; failures surface in Result.Err.
func (e *Executor) Run(ctx context.Context, ra RunnableAgent, inv tool.Invocation, input string) Result {
// never propagates a panic; failures surface in Result.Err (a top-level recover
// converts any panic — including from a host Port — into a run error).
func (e *Executor) Run(ctx context.Context, ra RunnableAgent, inv tool.Invocation, input string) (res Result) {
started := time.Now()
res := Result{RunID: inv.RunID}
res = Result{RunID: inv.RunID}
// Enforce the no-panic contract: a panic anywhere in the run (incl. a host
// Critic/Audit/Palette callback on the main goroutine) becomes Result.Err
// rather than unwinding into the caller.
defer func() {
if r := recover(); r != nil {
res.Err = fmt.Errorf("run.Executor: recovered panic: %v", r)
}
}()
tier := ra.ModelTier
if tier == "" {
@@ -141,25 +160,33 @@ func (e *Executor) Run(ctx context.Context, ra RunnableAgent, inv tool.Invocatio
// Audit start (optional). The recorder satisfies RunTally; stamp it on the
// invocation so a self-status tool can read live progress.
info := RunInfo{
RunID: inv.RunID,
SubjectID: ra.ID,
Name: ra.Name,
CallerID: inv.CallerID,
ChannelID: inv.ChannelID,
ParentRunID: inv.ParentRunID,
Inputs: inv.SkillInputs,
StartedAt: started,
MaxIterations: maxIter,
}
var rec RunRecorder
var stateAcc *RunStateAccessor
if e.cfg.Ports.Audit != nil {
rec = e.cfg.Ports.Audit.StartRun(ctx, RunInfo{
RunID: inv.RunID,
SubjectID: ra.ID,
Name: ra.Name,
CallerID: inv.CallerID,
ChannelID: inv.ChannelID,
ParentRunID: inv.ParentRunID,
Inputs: inv.SkillInputs,
StartedAt: started,
})
rec = e.cfg.Ports.Audit.StartRun(ctx, info)
}
if rec != nil {
stateAcc = NewRunStateAccessor(rec, maxIter, 0, started)
inv.RunState = stateAcc
}
// Steer mailbox: lets session tools (via inv.AttachImages) feed multimodal
// messages into the running conversation before its next step. Created BEFORE
// the toolbox build so any tool's handler captures the live AttachImages seam.
mailbox := &steerMailbox{}
inv.AttachImages = (&runSession{mailbox: mailbox}).AttachImages
// Build the toolbox from the agent's low-level tools.
toolbox, err := e.cfg.Registry.Build(ra.LowLevelTools, inv, tool.Visibility("private"), nil)
if err != nil {
@@ -168,16 +195,66 @@ func (e *Executor) Run(ctx context.Context, ra RunnableAgent, inv tool.Invocatio
return res
}
// Add skill__/agent__ delegation tools from the agent's palette (nil-safe:
// no PaletteSource or empty palette → no delegation tools).
if err := addDelegationTools(toolbox, ra, inv, e.cfg.Ports.Palette); err != nil {
res.Err = fmt.Errorf("build delegation tools: %w", err)
e.finishAudit(ctx, rec, "error", res, started, res.Err)
return res
}
// Per-invocation ExtraTools + a SessionToolFactory's per-run tools, added on
// top of the agent's palette. The factory closes over the live session (the
// AttachImages mailbox); its PostRun hook (held for after the run) produces
// artifacts attached to res.PostRunResult, and its Cleanup is deferred. All
// nil-safe.
for _, t := range inv.ExtraTools {
if err := toolbox.Add(t); err != nil {
res.Err = fmt.Errorf("add extra tool: %w", err)
e.finishAudit(ctx, rec, "error", res, started, res.Err)
return res
}
}
var postRun func(ctx context.Context, transcript []llm.Message, output string, runErr error) *tool.PostRunResult
if inv.SessionToolFactory != nil {
st := inv.SessionToolFactory(&runSession{mailbox: mailbox})
if st.Cleanup != nil {
defer safeCleanup(st.Cleanup) // panic-isolated, like runPostRun
}
for _, t := range st.Tools {
if err := toolbox.Add(t); err != nil {
res.Err = fmt.Errorf("add session tool: %w", err)
e.finishAudit(ctx, rec, "error", res, started, res.Err)
return res
}
}
postRun = st.PostRun
}
// Run context: bound by MaxRuntime, detached from the caller's deadline so a
// lane/queue wait doesn't eat the run budget (mort's V10 lesson). Caller
// cancellation still propagates via MergeCancellation. Created BEFORE the
// step observer so the observer forwards the merged run context (not a
// possibly-cancelled caller ctx) to OnStep consumers.
runCtx, cancel := context.WithTimeout(context.WithoutCancel(ctx), maxRuntime)
defer cancel()
// MaxRuntime stays a WithTimeout so its DeadlineExceeded propagates through the
// child chain (→ "timeout"), preserving the run's-own-timeout vs caller-cancel
// distinction. A NESTED cause-carrying layer lets a critic kill surface as a
// distinct "killed" without disturbing that: only an ErrCriticKill cause is
// consulted in statusFor; a generic run error or a caller cancel is classified
// by the run error itself.
timeoutCtx, cancelTimeout := context.WithTimeout(context.WithoutCancel(ctx), maxRuntime)
defer cancelTimeout()
runCtx, cancelCause := context.WithCancelCause(timeoutCtx)
defer cancelCause(nil)
runCtx, mergeCancel := MergeCancellation(runCtx, ctx)
defer mergeCancel()
// Critic (optional): monitors the run for a stall, can nudge/extend/kill via
// its host Escalator. Its hard deadline is bound to runCtx (cancel on pass).
// nil-safe: no-op when no critic is configured or the agent doesn't enable it.
critic, stopCritic := e.startCritic(runCtx, cancelCause, ra, info)
defer stopCritic()
// Step instrumentation: accumulate Result.Steps + fire inv.OnStep, feed the
// audit recorder, and keep the live iteration counter fresh. majordomo's
// step observer hands us each completed iteration; we zip the model's tool
@@ -192,6 +269,7 @@ func (e *Executor) Run(ctx context.Context, ra RunnableAgent, inv tool.Invocatio
if rec != nil {
rec.OnStep(s.Index, s.Response)
}
critic.recordStep(s.Index, s.Response) // keep the critic's activity clock fresh + carry the step payload
var calls []llm.ToolCall
if s.Response != nil {
calls = s.Response.ToolCalls
@@ -202,6 +280,7 @@ func (e *Executor) Run(ctx context.Context, ra RunnableAgent, inv tool.Invocatio
}
for i := 0; i < n; i++ {
call, r := calls[i], s.Results[i]
critic.recordToolStart(call.Name, string(call.Arguments))
emitter.toolStart(runCtx, call.Name, call.Arguments)
emitter.toolEnd(runCtx, call, r.Content, r.IsError)
if rec != nil {
@@ -212,7 +291,10 @@ func (e *Executor) Run(ctx context.Context, ra RunnableAgent, inv tool.Invocatio
opts := []agent.Option{
agent.WithToolbox(toolbox),
agent.WithMaxSteps(maxIter),
// Step ceiling: a fixed WithMaxSteps(maxIter) normally, but when a critic is
// active it owns a DYNAMIC ceiling (WithMaxStepsFunc) so it can raise a
// healthy-but-long run's budget mid-flight. Falls back to maxIter.
critic.maxStepsOption(maxIter),
agent.WithToolErrorLimits(e.cfg.Defaults.MaxConsecutiveToolErrors, e.cfg.Defaults.MaxSameToolCallRepeats),
agent.WithStepObserver(stepObserver),
}
@@ -236,9 +318,12 @@ func (e *Executor) Run(ctx context.Context, ra RunnableAgent, inv tool.Invocatio
}
ag := agent.New(model, e.systemPrompt(ra), opts...)
runRes, runErr := ag.Run(runCtx, input)
// One WithSteer drains BOTH the session mailbox (a tool's AttachImages) and
// the critic's nudges before each step.
steer := func() []llm.Message { return append(mailbox.drain(), critic.drainSteer()...) }
runRes, runErr := runAgent(runCtx, ag, input, inv.Images, agent.WithSteer(steer))
status := statusFor(runErr)
status := statusFor(runCtx, runErr)
if runRes != nil {
res.Output = runRes.Output
res.Usage = runRes.Usage
@@ -246,20 +331,43 @@ func (e *Executor) Run(ctx context.Context, ra RunnableAgent, inv tool.Invocatio
res.Steps = emitter.snapshot()
res.Err = runErr
// PostRun: hand the SessionToolFactory's hook the full transcript (populated
// even on partial results) so it can produce artifacts. Best-effort +
// panic-isolated — a PostRun failure never fails an otherwise-successful run.
if postRun != nil {
var transcript []llm.Message
if runRes != nil {
transcript = runRes.Messages
}
// Detach from the caller's ctx: a finished/cancelled caller must not abort
// artifact production (the hook owns its own bounding, per its contract).
res.PostRunResult = runPostRun(detach(ctx), postRun, transcript, res.Output, runErr)
}
e.finishAudit(ctx, rec, status, res, started, runErr)
if e.cfg.Ports.Budget != nil {
e.cfg.Ports.Budget.Commit(detach(ctx), inv.CallerID, time.Since(started).Seconds())
}
e.deliver(ctx, inv, res, runErr)
return res
}
// statusFor maps a run error to a RunStats.Status, distinguishing a deadline
// (timeout) and a cancellation (cancelled — caller cancel or shutdown) from a
// generic error so audit consumers can tell them apart.
func statusFor(runErr error) string {
// statusFor maps a run error to a RunStats.Status, distinguishing a critic kill
// (killed), a deadline (timeout), and a cancellation (cancelled — caller cancel
// or shutdown) from a generic error so audit consumers can tell them apart. The
// run context's cancellation cause carries the distinction (ErrCriticKill /
// DeadlineExceeded), since ctx.Err() alone only reports Canceled.
func statusFor(runCtx context.Context, runErr error) string {
switch {
case runErr == nil:
return "ok"
// Only the kill is recovered from the cancellation cause — a critic kill
// surfaces as a plain Canceled run error, so without this it'd read as
// "cancelled". Everything else is classified by the run error itself, so a
// genuine run error is never relabeled just because the context was later
// cancelled, and a caller cancel/deadline stays "cancelled" (not "timeout").
case errors.Is(context.Cause(runCtx), ErrCriticKill):
return "killed"
case errors.Is(runErr, context.DeadlineExceeded):
return "timeout"
case errors.Is(runErr, context.Canceled):
@@ -308,6 +416,23 @@ func (e *Executor) compactionThreshold(tier string) int {
return int(float64(max) * e.cfg.Defaults.CompactionThresholdRatio)
}
// deliver posts the run's output (or error) via run.Ports.Delivery when both a
// Delivery and a target (inv.DeliveryID) are set. No target = the caller reads
// Result.Output itself (the synchronous default). Best-effort + detached: a
// delivery failure must not change the run's outcome.
func (e *Executor) deliver(ctx context.Context, inv tool.Invocation, res Result, runErr error) {
if e.cfg.Ports.Delivery == nil || inv.DeliveryID == "" {
return
}
target := deliver.Target{Kind: inv.DeliveryKind, ID: inv.DeliveryID}
dctx := detach(ctx)
if runErr != nil {
_ = e.cfg.Ports.Delivery.DeliverError(dctx, target, runErr)
return
}
_, _ = e.cfg.Ports.Delivery.Deliver(dctx, target, res.Output, nil)
}
// detach derives a bounded cleanup context off ctx, detached from its
// cancellation, for post-run writes. The cancel is intentionally not returned;
// CleanupContextTimeout bounds the lifetime.
@@ -316,3 +441,28 @@ func detach(ctx context.Context) context.Context {
_ = cancel // bounded by the timeout; nothing to cancel early
return c
}
// runAgent dispatches the majordomo agent loop. majordomo's Run takes a text-only
// input arg, so when the invocation carries images they're folded into the first
// user message (text + image parts) via WithHistory and Run is called with an
// empty input — the model then sees a multimodal opening turn. The image-less path
// passes the prompt straight through.
//
// The text part is omitted when input is blank (image-only run), matching
// runSession.AttachImages so no empty TextPart is sent.
func runAgent(ctx context.Context, ag *agent.Agent, input string, images []llm.ImagePart, opts ...agent.RunOption) (*agent.Result, error) {
if len(images) == 0 {
return ag.Run(ctx, input, opts...)
}
parts := make([]llm.Part, 0, len(images)+1)
if strings.TrimSpace(input) != "" {
parts = append(parts, llm.Text(input))
}
for _, img := range images {
parts = append(parts, img)
}
// Copy opts before appending so a caller-supplied backing array is never
// mutated/aliased (the variadic slice can have spare capacity).
opts = append(opts[:len(opts):len(opts)], agent.WithHistory([]llm.Message{llm.UserParts(parts...)}))
return ag.Run(ctx, "", opts...)
}
+20 -7
View File
@@ -148,20 +148,33 @@ func TestExecutorNilModelNoPanic(t *testing.T) {
}
}
// TestStatusFor maps run errors to RunStats.Status (gadfly F3).
// TestStatusFor maps run errors + cancellation cause to RunStats.Status (gadfly F3).
func TestStatusFor(t *testing.T) {
bg := context.Background()
// A context cancelled with the critic-kill cause: ctx.Err() is Canceled, but
// context.Cause carries ErrCriticKill → "killed".
killCtx, killCancel := context.WithCancelCause(context.Background())
killCancel(fmt.Errorf("%w: hung", ErrCriticKill))
// A context cancelled with a non-kill cause must NOT relabel a genuine run
// error: a real error stays "error" even though the ctx was later cancelled.
cancelledCtx, cc := context.WithCancelCause(context.Background())
cc(context.DeadlineExceeded)
cases := []struct {
ctx context.Context
err error
want string
}{
{nil, "ok"},
{context.DeadlineExceeded, "timeout"},
{context.Canceled, "cancelled"},
{fmt.Errorf("wrapped: %w", context.DeadlineExceeded), "timeout"},
{errors.New("boom"), "error"},
{bg, nil, "ok"},
{bg, context.DeadlineExceeded, "timeout"},
{bg, context.Canceled, "cancelled"},
{bg, fmt.Errorf("wrapped: %w", context.DeadlineExceeded), "timeout"},
{bg, errors.New("boom"), "error"},
{killCtx, context.Canceled, "killed"},
{cancelledCtx, errors.New("boom"), "error"}, // generic error not relabeled by cause
{cancelledCtx, context.Canceled, "cancelled"}, // caller cancel stays cancelled, not timeout
}
for _, c := range cases {
if got := statusFor(c.err); got != c.want {
if got := statusFor(c.ctx, c.err); got != c.want {
t.Errorf("statusFor(%v) = %q, want %q", c.err, got, c.want)
}
}
+121
View File
@@ -0,0 +1,121 @@
package run_test
import (
"context"
"strings"
"testing"
"gitea.stevedudenhoeffer.com/steve/majordomo/llm"
"gitea.stevedudenhoeffer.com/steve/majordomo/provider/fake"
"gitea.stevedudenhoeffer.com/steve/executus/run"
"gitea.stevedudenhoeffer.com/steve/executus/tool"
)
// TestExecutorFoldsInitialImages: when the invocation carries Images, they're
// folded into the first user message (alongside the prompt text) instead of being
// dropped — majordomo's Run input arg is text-only, so the executor seeds the
// multimodal opening turn via history.
func TestExecutorFoldsInitialImages(t *testing.T) {
fp := fake.New("fake")
fp.Enqueue("m", fake.Reply("saw the image"))
m, _ := fp.Model("m")
img := llm.ImagePart{MIME: "image/png", Data: []byte("PNGDATA")}
inv := tool.Invocation{RunID: "r1", Images: []llm.ImagePart{img}}
ex := run.New(run.Config{
Registry: tool.NewRegistry(),
Models: func(ctx context.Context, _ string) (context.Context, llm.Model, error) { return ctx, m, nil },
})
res := ex.Run(context.Background(), run.RunnableAgent{ModelTier: "m"}, inv, "describe this")
if res.Err != nil {
t.Fatalf("run error: %v", res.Err)
}
calls := fp.Calls()
if len(calls) == 0 {
t.Fatal("no model calls recorded")
}
// The text + image must be CO-LOCATED in a single user message (not split
// across two), so the model reads them as one multimodal turn.
coLocated := false
for _, msg := range calls[0].Request.Messages {
sawImage, sawText := false, false
for _, p := range msg.Parts {
switch pp := p.(type) {
case llm.ImagePart:
if string(pp.Data) == "PNGDATA" {
sawImage = true
}
case llm.TextPart:
if strings.Contains(pp.Text, "describe this") {
sawText = true
}
}
}
if sawImage && sawText {
coLocated = true
}
}
if !coLocated {
t.Error("image + prompt text were not folded into the SAME user message")
}
}
// TestExecutorImageOnlyNoBlankText: an image-only run (blank prompt) must NOT emit
// an empty TextPart — the message carries just the image, matching
// runSession.AttachImages's guard.
func TestExecutorImageOnlyNoBlankText(t *testing.T) {
fp := fake.New("fake")
fp.Enqueue("m", fake.Reply("saw it"))
m, _ := fp.Model("m")
inv := tool.Invocation{RunID: "r3", Images: []llm.ImagePart{{MIME: "image/png", Data: []byte("IMG")}}}
ex := run.New(run.Config{
Registry: tool.NewRegistry(),
Models: func(ctx context.Context, _ string) (context.Context, llm.Model, error) { return ctx, m, nil },
})
res := ex.Run(context.Background(), run.RunnableAgent{ModelTier: "m"}, inv, " ")
if res.Err != nil {
t.Fatalf("run error: %v", res.Err)
}
for _, msg := range fp.Calls()[0].Request.Messages {
for _, p := range msg.Parts {
if tp, ok := p.(llm.TextPart); ok && strings.TrimSpace(tp.Text) == "" {
t.Error("image-only run emitted a blank TextPart")
}
}
}
}
// TestExecutorTextOnlyUnchanged: with no Images, the prompt flows through as the
// text input (regression guard that the fold path didn't break the common case).
func TestExecutorTextOnlyUnchanged(t *testing.T) {
fp := fake.New("fake")
fp.Enqueue("m", fake.Reply("ok"))
m, _ := fp.Model("m")
ex := run.New(run.Config{
Registry: tool.NewRegistry(),
Models: func(ctx context.Context, _ string) (context.Context, llm.Model, error) { return ctx, m, nil },
})
res := ex.Run(context.Background(), run.RunnableAgent{ModelTier: "m"}, tool.Invocation{RunID: "r2"}, "plain prompt")
if res.Err != nil {
t.Fatalf("run error: %v", res.Err)
}
calls := fp.Calls()
if len(calls) == 0 {
t.Fatal("no model calls recorded")
}
sawText := false
for _, msg := range calls[0].Request.Messages {
for _, p := range msg.Parts {
if tp, ok := p.(llm.TextPart); ok && strings.Contains(tp.Text, "plain prompt") {
sawText = true
}
}
}
if !sawText {
t.Error("text-only prompt did not reach the model")
}
}
+102
View File
@@ -0,0 +1,102 @@
package run
import (
"context"
"fmt"
"gitea.stevedudenhoeffer.com/steve/majordomo/llm"
"gitea.stevedudenhoeffer.com/steve/executus/tool"
)
// addDelegationTools adds a delegation tool to the toolbox for each
// SkillPalette / SubAgentPalette entry, backed by the PaletteSource:
//
// - skill__<name> invokes the named saved skill with structured inputs.
// - agent__<name> invokes the named sub-agent with a prompt.
//
// Each delegated call runs as a CHILD of the current run (parentRunID =
// inv.RunID), inheriting the caller + channel. No-op when palette is nil or both
// palettes are empty — so an agent with no palette (or a host with no
// PaletteSource) simply has no delegation tools, exactly as before.
func addDelegationTools(box *llm.Toolbox, ra RunnableAgent, inv tool.Invocation, palette PaletteSource) error {
if palette == nil {
return nil
}
seen := map[string]bool{} // dedupe across both palettes by final tool name
for _, name := range ra.SkillPalette {
name := name // capture
toolName := "skill__" + name
if name == "" || seen[toolName] { // skip empty / duplicate palette entries
continue
}
seen[toolName] = true
t := llm.DefineTool(
toolName,
fmt.Sprintf("Delegate the task to the %q skill. Provide its declared inputs.", name),
func(ctx context.Context, args skillDelegateArgs) (any, error) {
out, _, status, err := palette.InvokeSkill(ctx, inv.CallerID, inv.ChannelID, name, args.Inputs, inv.RunID)
if err != nil {
return nil, delegationErr("skill", name, out, err)
}
return delegationResult(name, "skill", out, status), nil
},
)
if err := box.Add(t); err != nil {
return fmt.Errorf("add %s: %w", toolName, err)
}
}
for _, name := range ra.SubAgentPalette {
name := name // capture
toolName := "agent__" + name
if name == "" || seen[toolName] {
continue
}
seen[toolName] = true
t := llm.DefineTool(
toolName,
fmt.Sprintf("Delegate the task to the %q sub-agent with a natural-language prompt.", name),
func(ctx context.Context, args agentDelegateArgs) (any, error) {
out, _, status, err := palette.InvokeAgent(ctx, inv.CallerID, inv.ChannelID, name, args.Prompt, inv.RunID, "", "", nil, nil)
if err != nil {
return nil, delegationErr("agent", name, out, err)
}
return delegationResult(name, "agent", out, status), nil
},
)
if err := box.Add(t); err != nil {
return fmt.Errorf("add %s: %w", toolName, err)
}
}
return nil
}
// delegationResult surfaces a non-ok child status to the parent agent (so it can
// react to a timeout/cancel/budget stop) while still passing the partial output.
func delegationResult(name, kind, out, status string) string {
if status != "" && status != "ok" {
header := fmt.Sprintf("[%s %q ended with status %q]", kind, name, status)
if out == "" { // no trailing blank line when there's no body
return header
}
return header + "\n" + out
}
return out
}
// delegationErr wraps a hard delegation failure, folding in any partial output
// the child produced before failing (so it isn't silently lost).
func delegationErr(kind, name, partial string, err error) error {
if partial != "" {
return fmt.Errorf("%s %q failed (partial output: %q): %w", kind, name, partial, err)
}
return fmt.Errorf("%s %q failed: %w", kind, name, err)
}
type skillDelegateArgs struct {
Inputs map[string]any `json:"inputs" description:"Inputs for the skill, matching its declared input schema."`
}
type agentDelegateArgs struct {
Prompt string `json:"prompt" description:"The task/prompt to hand the sub-agent."`
}
+125
View File
@@ -0,0 +1,125 @@
package run_test
import (
"context"
"encoding/json"
"testing"
"gitea.stevedudenhoeffer.com/steve/majordomo/llm"
"gitea.stevedudenhoeffer.com/steve/majordomo/provider/fake"
"gitea.stevedudenhoeffer.com/steve/executus/run"
"gitea.stevedudenhoeffer.com/steve/executus/tool"
)
// recordingPalette captures the delegation call it received.
type recordingPalette struct {
gotName, gotCaller, gotParent string
gotInputs map[string]any
}
func (p *recordingPalette) ResolveSkill(context.Context, string, string) (string, error) {
return "", nil
}
func (p *recordingPalette) InvokeSkill(_ context.Context, callerID, _, name string, inputs map[string]any, parentRunID string) (string, string, string, error) {
p.gotName, p.gotCaller, p.gotParent, p.gotInputs = name, callerID, parentRunID, inputs
return "the skill output", "child-run-1", "ok", nil
}
func (p *recordingPalette) ResolveAgent(context.Context, string, string) (string, error) {
return "", nil
}
func (p *recordingPalette) InvokeAgent(context.Context, string, string, string, string, string, string, string, []string, func(context.Context, string, string)) (string, string, string, error) {
return "", "", "ok", nil
}
// TestPaletteDelegation: an agent with a SkillPalette gets a skill__<name> tool;
// the model calls it, the executor routes it through run.Ports.Palette as a
// child of the current run, and the result flows back into the loop.
func TestPaletteDelegation(t *testing.T) {
pal := &recordingPalette{}
fp := fake.New("fake")
fp.Enqueue("m",
fake.ReplyWith(llm.Response{ToolCalls: []llm.ToolCall{{
ID: "c1",
Name: "skill__helper",
Arguments: json.RawMessage(`{"inputs":{"q":"hi"}}`),
}}}),
fake.Reply("delegated and done"),
)
m, err := fp.Model("m")
if err != nil {
t.Fatal(err)
}
ex := run.New(run.Config{
Registry: tool.NewRegistry(),
Models: func(ctx context.Context, _ string) (context.Context, llm.Model, error) { return ctx, m, nil },
Ports: run.Ports{Palette: pal},
})
res := ex.Run(context.Background(),
run.RunnableAgent{ID: "a1", Name: "boss", ModelTier: "m", SkillPalette: []string{"helper"}},
tool.Invocation{RunID: "parent-run", CallerID: "caller-7", ChannelID: "chan"},
"delegate please")
if res.Err != nil {
t.Fatalf("run error: %v", res.Err)
}
if res.Output != "delegated and done" {
t.Errorf("output = %q", res.Output)
}
if pal.gotName != "helper" {
t.Errorf("InvokeSkill name = %q, want helper", pal.gotName)
}
if pal.gotCaller != "caller-7" {
t.Errorf("InvokeSkill caller = %q, want caller-7", pal.gotCaller)
}
if pal.gotParent != "parent-run" {
t.Errorf("InvokeSkill parentRunID = %q, want parent-run (child of the current run)", pal.gotParent)
}
if pal.gotInputs["q"] != "hi" {
t.Errorf("InvokeSkill inputs = %+v, want q=hi", pal.gotInputs)
}
}
// TestNoPaletteNoDelegationTools: nil PaletteSource → no delegation tools, run
// still works (the agent just has no skill__/agent__ tools).
func TestNoPaletteNoDelegationTools(t *testing.T) {
fp := fake.New("fake")
fp.Enqueue("m", fake.Reply("ok"))
m, _ := fp.Model("m")
ex := run.New(run.Config{
Registry: tool.NewRegistry(),
Models: func(ctx context.Context, _ string) (context.Context, llm.Model, error) { return ctx, m, nil },
})
res := ex.Run(context.Background(),
run.RunnableAgent{Name: "x", ModelTier: "m", SkillPalette: []string{"helper"}},
tool.Invocation{RunID: "r"}, "hi")
if res.Err != nil || res.Output != "ok" {
t.Fatalf("nil-palette run failed: %v / %q", res.Err, res.Output)
}
}
// TestDelegationDedupeAndEmptySkip: empty + duplicate palette names are skipped,
// not turned into "skill__"/duplicate tools that error at box.Add (gadfly C0).
func TestDelegationDedupeAndEmptySkip(t *testing.T) {
pal := &recordingPalette{}
fp := fake.New("fake")
fp.Enqueue("m", fake.Reply("ok"))
m, _ := fp.Model("m")
ex := run.New(run.Config{
Registry: tool.NewRegistry(),
Models: func(ctx context.Context, _ string) (context.Context, llm.Model, error) { return ctx, m, nil },
Ports: run.Ports{Palette: pal},
})
// "" (empty) and a duplicate "helper" must not break the build.
res := ex.Run(context.Background(),
run.RunnableAgent{Name: "x", ModelTier: "m", SkillPalette: []string{"helper", "", "helper"}},
tool.Invocation{RunID: "r"}, "hi")
if res.Err != nil {
t.Fatalf("empty/duplicate palette names should be skipped, not error: %v", res.Err)
}
if res.Output != "ok" {
t.Fatalf("output = %q", res.Output)
}
}
+30 -2
View File
@@ -2,6 +2,7 @@ package run
import (
"context"
"errors"
"time"
"gitea.stevedudenhoeffer.com/steve/majordomo/llm"
@@ -9,6 +10,12 @@ import (
"gitea.stevedudenhoeffer.com/steve/executus/deliver"
)
// ErrCriticKill is the cancellation cause the executor stamps on a run the
// critic kills, so a critic kill surfaces as a distinct "killed" status (vs a
// backstop "timeout" or a caller "cancelled"). A host CriticHandle signals a
// kill via KillCause(); the executor wraps that reason with this sentinel.
var ErrCriticKill = errors.New("run: critic killed the run")
// Ports are the host seams the run executor consumes. Every field is nil-safe:
// a light host passes the zero Ports and gets a bounded, in-memory run with no
// persistence, audit, budget, critic, delegation, or delivery — which is
@@ -48,6 +55,9 @@ type RunInfo struct {
ParentRunID string
Inputs map[string]any
StartedAt time.Time
// MaxIterations is the run's base tool-dispatch step ceiling, so a critic can
// raise it relative to the baseline (see CriticHandle.MaxSteps).
MaxIterations int
}
// RunStats is the terminal roll-up a recorder's Close writes. Mirrors mort's
@@ -113,10 +123,17 @@ type Critic interface {
}
// CriticHandle is the executor's live link to a run's critic.
//
// Concurrency: the executor calls RecordStep/RecordToolStart/Steer from the run
// goroutine while a separate watch goroutine polls Deadline() and the run's end
// calls Stop() — so implementations MUST be safe for concurrent use across these
// methods (the critic battery's handle guards its state with a mutex).
type CriticHandle interface {
// RecordStep / RecordToolStart keep the critic's activity clock fresh so a
// healthy-but-slow run is not mistaken for a hang.
RecordStep(iter int)
// healthy-but-slow run is not mistaken for a hang. RecordStep also carries the
// completed step's model response (nil-safe) so the critic's Trace can show
// what the agent actually produced, not just an iteration count.
RecordStep(iter int, resp *llm.Response)
RecordToolStart(name, args string)
// Steer returns any messages the critic wants injected into the loop (a
// nudge), drained before each step — matches majordomo agent.WithSteer.
@@ -124,6 +141,17 @@ type CriticHandle interface {
// Deadline returns the current hard-kill deadline (the critic may extend
// it); the executor binds the run context to it. Zero = no hard deadline.
Deadline() time.Time
// MaxSteps returns the current tool-dispatch step ceiling, polled by the
// executor each step (via majordomo WithMaxStepsFunc) so a critic can raise a
// healthy-but-long run's iteration budget mid-flight. Return <= 0 to defer to
// the run's base MaxIterations.
MaxSteps() int
// KillCause returns a non-nil reason iff the critic has decided to KILL this
// run (as opposed to letting the hard-deadline backstop expire). The executor
// reads it when the deadline passes: non-nil → cancel the run with
// ErrCriticKill (status "killed"); nil → the backstop expired naturally
// (status "timeout"). Hosts that never distinguish the two may return nil.
KillCause() error
// Stop ends monitoring when the run finishes.
Stop()
}
+88
View File
@@ -0,0 +1,88 @@
package run
import (
"context"
"log/slog"
"strings"
"sync"
"gitea.stevedudenhoeffer.com/steve/majordomo/llm"
"gitea.stevedudenhoeffer.com/steve/executus/tool"
)
// runPostRun invokes a SessionToolFactory's PostRun hook with panic isolation:
// a PostRun panic (or a slow artifact build that the hook mishandles) must not
// fail an otherwise-successful run — artifacts are best-effort, the agent's text
// output is the source of truth.
func runPostRun(ctx context.Context,
hook func(context.Context, []llm.Message, string, error) *tool.PostRunResult,
transcript []llm.Message, output string, runErr error) (prr *tool.PostRunResult) {
defer func() {
if r := recover(); r != nil {
slog.Error("run: PostRun hook panicked; no artifacts produced", "panic", r)
prr = nil
}
}()
return hook(ctx, transcript, output, runErr)
}
// steerMailbox is a thread-safe queue of messages a session tool (via
// tool.Invocation.AttachImages) wants injected into the agent loop before its
// next step — the same WithSteer mechanism the critic uses for nudges, exposed
// to ordinary tools so they can show the model content (e.g. a rendered
// preview) it must SEE, not just be told about.
type steerMailbox struct {
mu sync.Mutex
msgs []llm.Message
}
func (m *steerMailbox) push(msg llm.Message) {
m.mu.Lock()
m.msgs = append(m.msgs, msg)
m.mu.Unlock()
}
// drain returns and clears the queued messages (nil when empty).
func (m *steerMailbox) drain() []llm.Message {
m.mu.Lock()
defer m.mu.Unlock()
if len(m.msgs) == 0 {
return nil
}
out := m.msgs
m.msgs = nil
return out
}
// runSession implements tool.AgentSession over a steer mailbox: AttachImages
// queues a user-role multimodal message the agent loop injects before its next
// step. Replaces legacy agentkit's Agent.AttachImages — majordomo's *agent.Agent
// is immutable mid-run, so mutation flows through the run-scoped steer mailbox.
type runSession struct{ mailbox *steerMailbox }
func (s *runSession) AttachImages(text string, images ...llm.ImagePart) {
parts := make([]llm.Part, 0, len(images)+1)
if strings.TrimSpace(text) != "" {
parts = append(parts, llm.Text(text))
}
for _, img := range images {
parts = append(parts, img)
}
if len(parts) == 0 {
return
}
s.mailbox.push(llm.UserParts(parts...))
}
// safeCleanup runs a SessionTools.Cleanup with panic isolation, so a misbehaving
// teardown (temp-dir removal, handle close) can't clobber an otherwise-successful
// run via the executor's top-level recover.
func safeCleanup(fn func()) {
defer func() {
if r := recover(); r != nil {
slog.Error("run: session Cleanup panicked", "panic", r)
}
}()
fn()
}
+94
View File
@@ -0,0 +1,94 @@
package run_test
import (
"context"
"testing"
"gitea.stevedudenhoeffer.com/steve/majordomo/llm"
"gitea.stevedudenhoeffer.com/steve/majordomo/provider/fake"
"gitea.stevedudenhoeffer.com/steve/executus/run"
"gitea.stevedudenhoeffer.com/steve/executus/tool"
)
// TestSessionToolFactoryPostRun: a SessionToolFactory's PostRun hook produces an
// artifact (from the run output + transcript) that lands on Result.PostRunResult,
// and its Cleanup is deferred.
func TestSessionToolFactoryPostRun(t *testing.T) {
fp := fake.New("fake")
fp.Enqueue("m", fake.Reply("hello artifacts"))
m, _ := fp.Model("m")
cleanupCalled := false
inv := tool.Invocation{
RunID: "r1",
SessionToolFactory: func(_ tool.AgentSession) tool.SessionTools {
return tool.SessionTools{
PostRun: func(_ context.Context, transcript []llm.Message, output string, _ error) *tool.PostRunResult {
return &tool.PostRunResult{
Artifacts: []tool.Artifact{{Name: "out.txt", MimeType: "text/plain", Data: []byte(output)}},
Metadata: map[string]any{"transcript_len": len(transcript)},
}
},
Cleanup: func() { cleanupCalled = true },
}
},
}
ex := run.New(run.Config{
Registry: tool.NewRegistry(),
Models: func(ctx context.Context, _ string) (context.Context, llm.Model, error) { return ctx, m, nil },
})
res := ex.Run(context.Background(), run.RunnableAgent{ModelTier: "m"}, inv, "go")
if res.Err != nil {
t.Fatalf("run error: %v", res.Err)
}
if res.PostRunResult == nil {
t.Fatal("Result.PostRunResult is nil — PostRun hook not invoked / not attached")
}
if n := len(res.PostRunResult.Artifacts); n != 1 {
t.Fatalf("artifacts = %d, want 1", n)
}
a := res.PostRunResult.Artifacts[0]
if a.Name != "out.txt" || string(a.Data) != "hello artifacts" {
t.Errorf("artifact = {%q, %q}", a.Name, string(a.Data))
}
if tl, _ := res.PostRunResult.Metadata["transcript_len"].(int); tl < 1 {
t.Errorf("transcript not passed to PostRun (len=%d)", tl)
}
if !cleanupCalled {
t.Error("Cleanup was not deferred/called")
}
}
// TestSessionToolFactoryAddsTool: tools the factory returns join the run's
// toolbox and are callable by the model.
func TestSessionToolFactoryAddsTool(t *testing.T) {
fp := fake.New("fake")
fp.Enqueue("m",
fake.ReplyWith(llm.Response{ToolCalls: []llm.ToolCall{{ID: "c1", Name: "render", Arguments: []byte(`{}`)}}}),
fake.Reply("rendered"),
)
m, _ := fp.Model("m")
toolCalled := false
renderTool := llm.DefineTool("render", "render a preview",
func(_ context.Context, _ struct{}) (any, error) { toolCalled = true; return "ok", nil })
inv := tool.Invocation{
RunID: "r2",
SessionToolFactory: func(_ tool.AgentSession) tool.SessionTools {
return tool.SessionTools{Tools: []llm.Tool{renderTool}}
},
}
ex := run.New(run.Config{
Registry: tool.NewRegistry(),
Models: func(ctx context.Context, _ string) (context.Context, llm.Model, error) { return ctx, m, nil },
})
res := ex.Run(context.Background(),
run.RunnableAgent{ModelTier: "m", MaxIterations: 5}, inv, "go")
if res.Err != nil {
t.Fatalf("run error: %v", res.Err)
}
if !toolCalled {
t.Error("session-factory tool was not added to the toolbox / not called")
}
}
+7
View File
@@ -173,6 +173,13 @@ type Invocation struct {
CallerID string
ChannelID string
GuildID string
// DeliveryKind / DeliveryID name where the executor posts the run's output
// via run.Ports.Delivery — a host-interpreted Target ("channel"/"dm"/
// "thread"/...). An empty DeliveryID means the executor delivers nothing
// and the caller reads Result.Output itself (the synchronous default; the
// `.agent run` canary works this way).
DeliveryKind string
DeliveryID string
// CallerIsAdmin is true when the caller is a mort admin (Member.Admin).
// Populated by the executor at run dispatch via Bot.GetMember; defaults
// to false on any lookup failure (member not found, DB error, empty