Compare commits
19 Commits
1a1d5e417b
..
v0.1.3
| Author | SHA1 | Date | |
|---|---|---|---|
| add8f847a4 | |||
| df4033f42e | |||
| 1e65f4b6e5 | |||
| 2ef88f2a73 | |||
| 7a5eebc468 | |||
| 7211ce227c | |||
| f367796244 | |||
| 0acaa8c9a5 | |||
| a35c176b42 | |||
| 1cf46c9954 | |||
| 56baac758d | |||
| 5779035722 | |||
| 1a2a2364ec | |||
| c08ce47fa6 | |||
| 784d5d7ce4 | |||
| 4e179259de | |||
| 82a816ae29 | |||
| be4bbbcad5 | |||
| 390e6cf905 |
@@ -1,11 +1,8 @@
|
||||
# Gadfly — agentic adversarial PR reviewer (https://gitea.stevedudenhoeffer.com/steve/gadfly).
|
||||
#
|
||||
# Runs the published Gadfly image (pinned to an immutable :sha- tag — act_runner
|
||||
# caches :latest, and this build is what carries foreman provider-type support)
|
||||
# as a specialist swarm and posts
|
||||
# ONE consolidated review comment as gitea-actions. Advisory only — never blocks a
|
||||
# merge. This reviews executus PRs with 3 ollama-cloud models (3-lens suite). Gadfly
|
||||
# is a simple system — findings are advisory; always double-check before acting.
|
||||
# Gadfly adversarial review — subscribes to steve/gadfly's reusable workflow and
|
||||
# INHERITS its default swarm. This stub holds only the triggers, the actor gate,
|
||||
# secret forwarding, and the allow-list; the swarm config (models, lenses,
|
||||
# concurrency, timeouts) lives centrally in gadfly's review-reusable.yml so it is
|
||||
# tuned in ONE place. Advisory only — never blocks a merge.
|
||||
|
||||
name: Adversarial Review (Gadfly)
|
||||
|
||||
@@ -32,61 +29,27 @@ concurrency:
|
||||
jobs:
|
||||
review:
|
||||
# Security: only trusted users may trigger a secret-bearing run via a PR
|
||||
# comment (pull_request + workflow_dispatch are already trusted). Mirrors
|
||||
# GADFLY_ALLOWED_USERS, the in-container belt-and-suspenders check.
|
||||
# comment (pull_request + workflow_dispatch are already trusted). Mirrors the
|
||||
# allowed_users input below (the in-container belt-and-suspenders check) — both
|
||||
# lists must stay in sync; a workflow if: can't read a workflow_call input.
|
||||
if: >-
|
||||
github.event_name != 'issue_comment'
|
||||
|| (github.event.issue.pull_request
|
||||
&& (github.actor == 'steve'
|
||||
|| github.actor == 'fizi'
|
||||
|| github.actor == 'dazed'))
|
||||
runs-on: ubuntu-latest
|
||||
# Full fleet: 3 cloud (lens fan-out) + M1/M5 Macs via foreman. The slow local
|
||||
# lanes dominate wall time, so allow plenty of headroom.
|
||||
timeout-minutes: 90
|
||||
steps:
|
||||
- uses: docker://gitea.stevedudenhoeffer.com/steve/gadfly:sha-d7f364d
|
||||
env:
|
||||
GITEA_API: ${{ github.server_url }}/api/v1/repos/${{ github.repository }}
|
||||
GITEA_TOKEN: ${{ secrets.GITEA_TOKEN }}
|
||||
OLLAMA_CLOUD_API_KEY: ${{ secrets.OLLAMA_CLOUD_API_KEY }}
|
||||
# Local Macs, reached through their foreman queues (native Ollama on the
|
||||
# wire). GADFLY_ENDPOINT_M5 registers provider "m5",
|
||||
# each a foreman-preset Ollama client at the secret's URL, of the form:
|
||||
# foreman|https://<foreman-host>|<token>
|
||||
# Needs an image with foreman provider-type support (this one). If a Mac
|
||||
# is offline that model's comment shows an error and the others still post.
|
||||
# (Gitea secrets aren't auto-exposed — map each explicitly.)
|
||||
GADFLY_ENDPOINT_M5: ${{ secrets.GADFLY_ENDPOINT_M5 }}
|
||||
# Full fleet: 3 cloud + M1 Pro + M5 Max. The Macs are back so the
|
||||
# gadfly-reports scoreboard can quantify whether they earn their keep
|
||||
# (they previously took 26–29 min for ZERO real findings — now measured).
|
||||
# Cloud concurrency lives in the LENSES: one cloud model at a time
|
||||
# (ollama-cloud=1) with its 3 lenses concurrent (LENS ollama-cloud=3) so
|
||||
# its comment lands sooner; each Mac runs one model, lenses serial (its
|
||||
# foreman queue serializes anyway). All three provider lanes run parallel.
|
||||
GADFLY_MODELS: "minimax-m3:cloud,glm-5.2:cloud,glm-5.1:cloud,kimi-k2.7-code:cloud,deepseek-v4-pro:cloud,nemotron-3-super:cloud,gpt-oss:120b-cloud,qwen3-coder:480b-cloud,gemma4:cloud,m5/qwen3.6:35b-mlx"
|
||||
GADFLY_PROVIDER_CONCURRENCY: "ollama-cloud=3,m5=1"
|
||||
GADFLY_PROVIDER_LENS_CONCURRENCY: "ollama-cloud=3"
|
||||
# Default => the 3-lens suite (security, correctness, error-handling).
|
||||
# Set the repo var GADFLY_SPECIALISTS to override (csv / "all" / "auto").
|
||||
GADFLY_SPECIALISTS: ${{ vars.GADFLY_SPECIALISTS || 'security,correctness,error-handling' }}
|
||||
# Per-lens deadline + bounded steps so the slow local models stay sane.
|
||||
GADFLY_TIMEOUT_SECS: "600"
|
||||
GADFLY_MAX_STEPS: "14"
|
||||
# Allow-list for the comment trigger (mirrors the job-level if: guard).
|
||||
GADFLY_ALLOWED_USERS: "steve,fizi,dazed"
|
||||
# --- findings telemetry: POST runs + findings to the gadfly-reports store ---
|
||||
# Advisory & off unless GADFLY_FINDINGS_URL is set; failures only log to
|
||||
# stderr and never affect the review. GADFLY_REPO / GADFLY_PR are derived
|
||||
# in-container; the URL + token are user-scope secrets.
|
||||
GADFLY_FINDINGS_URL: ${{ secrets.GADFLY_FINDINGS_URL }}
|
||||
GADFLY_FINDINGS_TOKEN: ${{ secrets.GADFLY_FINDINGS_TOKEN }}
|
||||
# --- event context (leave as-is) ---
|
||||
EVENT_NAME: ${{ github.event_name }}
|
||||
PR: ${{ github.event.pull_request.number || github.event.issue.number || github.event.inputs.pr_number }}
|
||||
PR_BRANCH: ${{ github.head_ref }}
|
||||
IS_DRAFT: ${{ github.event.pull_request.draft }}
|
||||
COMMENT_BODY: ${{ github.event.comment.body }}
|
||||
COMMENT_ID: ${{ github.event.comment.id }}
|
||||
ACTOR: ${{ github.actor }}
|
||||
# Pinned to an immutable gadfly commit (not @v1): our act_runners are long-lived
|
||||
# and cache the reusable-workflow ref, so a moved v1 tag keeps resolving to the
|
||||
# stale cached copy. A unique sha forces a cache miss → fresh fetch. Bump this
|
||||
# sha to adopt central swarm changes.
|
||||
uses: steve/gadfly/.gitea/workflows/review-reusable.yml@7bc3c982fa7b72367034c673f7812bf05e9c503e
|
||||
# Least privilege: forward only the review secrets (not `secrets: inherit`,
|
||||
# which would expose every repo secret). GITEA_TOKEN is the automatic token.
|
||||
secrets:
|
||||
OLLAMA_CLOUD_API_KEY: ${{ secrets.OLLAMA_CLOUD_API_KEY }}
|
||||
CLAUDE_CODE_OAUTH_TOKEN: ${{ secrets.CLAUDE_CODE_OAUTH_TOKEN }}
|
||||
GADFLY_FINDINGS_URL: ${{ secrets.GADFLY_FINDINGS_URL }}
|
||||
GADFLY_FINDINGS_TOKEN: ${{ secrets.GADFLY_FINDINGS_TOKEN }}
|
||||
with:
|
||||
# Consumer-specific allow-list; everything else is inherited.
|
||||
allowed_users: "steve,fizi,dazed"
|
||||
|
||||
@@ -37,7 +37,7 @@ bot) — mort and gadfly are the first two consumers (heavy and light). See
|
||||
tool registry, majordomo's agent loop, context compaction, run-bounding, and
|
||||
step/audit instrumentation into one `Run(ctx, RunnableAgent, inv) Result`, with
|
||||
every host concern behind a nil-safe `run.Ports` (Audit/Budget/Critic/
|
||||
Checkpointer/PaletteSource/Delivery). See `examples/minimal`.
|
||||
Checkpointer/PaletteSource/Delivery/InputFiles). See `examples/minimal`.
|
||||
- `model/` — config-driven tier resolution + failover over majordomo, with
|
||||
pluggable `UsageSink`/`TraceSink` and `GenerateWith[T]` structured output.
|
||||
- `tool/` — the tool registry + 3-stage permission model + SSRF guard.
|
||||
|
||||
+19
-4
@@ -18,6 +18,7 @@ package critic
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"log/slog"
|
||||
"math"
|
||||
"sync"
|
||||
@@ -162,12 +163,15 @@ type handle struct {
|
||||
iterations int
|
||||
maxSteps int // current tool-dispatch ceiling (base MaxIterations, raised by RaiseStepsBy)
|
||||
lastTool string
|
||||
killed bool // sticky: once an Escalator kills, no later decision un-kills it
|
||||
killed bool // sticky: once an Escalator kills, no later decision un-kills it
|
||||
killCause error // non-nil once killed; surfaced via KillCause for "killed" status
|
||||
stopped bool
|
||||
stopCh chan struct{}
|
||||
}
|
||||
|
||||
func (h *handle) RecordStep(iter int) {
|
||||
func (h *handle) RecordStep(iter int, _ *llm.Response) {
|
||||
// This battery's Progress tracks iteration count + activity, not per-step
|
||||
// payload, so the response is unused here; a richer Escalator could record it.
|
||||
h.mu.Lock()
|
||||
h.iterations = iter
|
||||
h.lastActivity = h.now()
|
||||
@@ -204,6 +208,12 @@ func (h *handle) MaxSteps() int {
|
||||
return h.maxSteps
|
||||
}
|
||||
|
||||
func (h *handle) KillCause() error {
|
||||
h.mu.Lock()
|
||||
defer h.mu.Unlock()
|
||||
return h.killCause
|
||||
}
|
||||
|
||||
func (h *handle) Stop() {
|
||||
h.mu.Lock()
|
||||
if !h.stopped {
|
||||
@@ -266,8 +276,13 @@ func (h *handle) tick(ctx context.Context) {
|
||||
}
|
||||
if d.Kill {
|
||||
h.killed = true
|
||||
h.deadline = h.now() // immediate hard deadline → executor cancels
|
||||
return // ignore any Nudge/ExtendBy paired with a Kill
|
||||
reason := d.KillReason
|
||||
if reason == "" {
|
||||
reason = "critic killed the run"
|
||||
}
|
||||
h.killCause = errors.New(reason) // surfaced via KillCause → "killed" status
|
||||
h.deadline = h.now() // immediate hard deadline → executor cancels
|
||||
return // ignore any Nudge/ExtendBy paired with a Kill
|
||||
}
|
||||
if len(d.Nudge) > 0 {
|
||||
h.steer = append(h.steer, d.Nudge...)
|
||||
|
||||
@@ -51,7 +51,7 @@ func TestMonitorEscalatesOncePerIdlePeriodAndExtends(t *testing.T) {
|
||||
t.Error("deadline should have been extended past the original")
|
||||
}
|
||||
// A fresh step re-arms; another idle period escalates again.
|
||||
h.RecordStep(1)
|
||||
h.RecordStep(1, nil)
|
||||
time.Sleep(60 * time.Millisecond)
|
||||
mu.Lock()
|
||||
c2 := calls
|
||||
|
||||
+24
-14
@@ -2,9 +2,11 @@ package run
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"time"
|
||||
|
||||
"gitea.stevedudenhoeffer.com/steve/majordomo/agent"
|
||||
"gitea.stevedudenhoeffer.com/steve/majordomo/llm"
|
||||
)
|
||||
|
||||
// criticDeadlineCheck is how often the deadline-watch goroutine polls the
|
||||
@@ -21,12 +23,15 @@ type criticBinding struct {
|
||||
}
|
||||
|
||||
// startCritic begins critic monitoring for this run when one is configured and
|
||||
// the agent enables it. It launches a goroutine that cancels runCtx (via cancel)
|
||||
// the moment the critic's hard deadline passes — the critic may extend that
|
||||
// deadline, so a healthy-but-slow run is given room while a hung one is killed.
|
||||
// Returns (nil, no-op stop) when there is no critic. The caller MUST defer the
|
||||
// returned stop.
|
||||
func (e *Executor) startCritic(runCtx context.Context, cancel context.CancelFunc, ra RunnableAgent, info RunInfo) (*criticBinding, func()) {
|
||||
// the agent enables it. It launches a goroutine that cancels runCtx (via
|
||||
// cancelCause) the moment the critic's hard deadline passes — the critic may
|
||||
// extend that deadline, so a healthy-but-slow run is given room while a hung one
|
||||
// is killed. When the deadline passes because the critic KILLED the run
|
||||
// (KillCause() != nil), the cancellation cause is ErrCriticKill (→ status
|
||||
// "killed"); when the backstop simply expired, it is context.DeadlineExceeded (→
|
||||
// "timeout"). Returns (nil, no-op stop) when there is no critic. The caller MUST
|
||||
// defer the returned stop.
|
||||
func (e *Executor) startCritic(runCtx context.Context, cancelCause context.CancelCauseFunc, ra RunnableAgent, info RunInfo) (*criticBinding, func()) {
|
||||
noop := func() {}
|
||||
if e.cfg.Ports.Critic == nil || !ra.Critic.Enabled {
|
||||
return nil, noop
|
||||
@@ -55,9 +60,14 @@ func (e *Executor) startCritic(runCtx context.Context, cancel context.CancelFunc
|
||||
return
|
||||
case <-t.C:
|
||||
// A zero deadline = no hard cap (not yet set); otherwise cancel
|
||||
// once we're at or past it.
|
||||
// once we're at or past it, distinguishing an explicit kill from a
|
||||
// natural backstop expiry so the run gets the right status.
|
||||
if d := h.Deadline(); !d.IsZero() && !time.Now().Before(d) {
|
||||
cancel()
|
||||
if cause := h.KillCause(); cause != nil {
|
||||
cancelCause(fmt.Errorf("%w: %s", ErrCriticKill, cause.Error()))
|
||||
} else {
|
||||
cancelCause(context.DeadlineExceeded)
|
||||
}
|
||||
return
|
||||
}
|
||||
}
|
||||
@@ -69,9 +79,9 @@ func (e *Executor) startCritic(runCtx context.Context, cancel context.CancelFunc
|
||||
}
|
||||
}
|
||||
|
||||
func (b *criticBinding) recordStep(iter int) {
|
||||
func (b *criticBinding) recordStep(iter int, resp *llm.Response) {
|
||||
if b != nil {
|
||||
b.h.RecordStep(iter)
|
||||
b.h.RecordStep(iter, resp)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -104,11 +114,11 @@ func (b *criticBinding) maxStepsOption(base int) agent.Option {
|
||||
})
|
||||
}
|
||||
|
||||
// steerOptions returns the agent RunOptions that drain the critic's steer
|
||||
// messages into the loop. Empty when there is no critic.
|
||||
func (b *criticBinding) steerOptions() []agent.RunOption {
|
||||
// drainSteer returns the critic's queued steer messages (nil-safe), so the
|
||||
// executor can merge them with the session steer mailbox into one WithSteer.
|
||||
func (b *criticBinding) drainSteer() []llm.Message {
|
||||
if b == nil {
|
||||
return nil
|
||||
}
|
||||
return []agent.RunOption{agent.WithSteer(b.h.Steer)}
|
||||
return b.h.Steer()
|
||||
}
|
||||
|
||||
+8
-2
@@ -23,10 +23,16 @@ type fakeCriticHandle struct {
|
||||
mu sync.Mutex
|
||||
steps, tools, stops int
|
||||
steered int
|
||||
maxSteps int // 0 => defer to the run's base MaxIterations
|
||||
maxSteps int // 0 => defer to the run's base MaxIterations
|
||||
killCause error // non-nil simulates a critic kill
|
||||
}
|
||||
|
||||
func (h *fakeCriticHandle) RecordStep(int) { h.mu.Lock(); h.steps++; h.mu.Unlock() }
|
||||
func (h *fakeCriticHandle) RecordStep(int, *llm.Response) { h.mu.Lock(); h.steps++; h.mu.Unlock() }
|
||||
func (h *fakeCriticHandle) KillCause() error {
|
||||
h.mu.Lock()
|
||||
defer h.mu.Unlock()
|
||||
return h.killCause
|
||||
}
|
||||
func (h *fakeCriticHandle) RecordToolStart(string, string) {
|
||||
h.mu.Lock()
|
||||
h.tools++
|
||||
|
||||
+113
-10
@@ -4,6 +4,7 @@ import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"gitea.stevedudenhoeffer.com/steve/majordomo/agent"
|
||||
@@ -101,6 +102,10 @@ type Result struct {
|
||||
Steps []tool.Step
|
||||
Usage llm.Usage
|
||||
Err error
|
||||
// PostRunResult carries artifacts produced by a SessionToolFactory's PostRun
|
||||
// hook (rendered images, files). nil when no factory was set or PostRun
|
||||
// returned nil. The host delivers these (e.g. mort's chat API / Discord).
|
||||
PostRunResult *tool.PostRunResult
|
||||
}
|
||||
|
||||
// Run executes ra with the given invocation + input and returns the Result. It
|
||||
@@ -176,6 +181,12 @@ func (e *Executor) Run(ctx context.Context, ra RunnableAgent, inv tool.Invocatio
|
||||
inv.RunState = stateAcc
|
||||
}
|
||||
|
||||
// Steer mailbox: lets session tools (via inv.AttachImages) feed multimodal
|
||||
// messages into the running conversation before its next step. Created BEFORE
|
||||
// the toolbox build so any tool's handler captures the live AttachImages seam.
|
||||
mailbox := &steerMailbox{}
|
||||
inv.AttachImages = (&runSession{mailbox: mailbox}).AttachImages
|
||||
|
||||
// Build the toolbox from the agent's low-level tools.
|
||||
toolbox, err := e.cfg.Registry.Build(ra.LowLevelTools, inv, tool.Visibility("private"), nil)
|
||||
if err != nil {
|
||||
@@ -192,20 +203,56 @@ func (e *Executor) Run(ctx context.Context, ra RunnableAgent, inv tool.Invocatio
|
||||
return res
|
||||
}
|
||||
|
||||
// Per-invocation ExtraTools + a SessionToolFactory's per-run tools, added on
|
||||
// top of the agent's palette. The factory closes over the live session (the
|
||||
// AttachImages mailbox); its PostRun hook (held for after the run) produces
|
||||
// artifacts attached to res.PostRunResult, and its Cleanup is deferred. All
|
||||
// nil-safe.
|
||||
for _, t := range inv.ExtraTools {
|
||||
if err := toolbox.Add(t); err != nil {
|
||||
res.Err = fmt.Errorf("add extra tool: %w", err)
|
||||
e.finishAudit(ctx, rec, "error", res, started, res.Err)
|
||||
return res
|
||||
}
|
||||
}
|
||||
var postRun func(ctx context.Context, transcript []llm.Message, output string, runErr error) *tool.PostRunResult
|
||||
if inv.SessionToolFactory != nil {
|
||||
st := inv.SessionToolFactory(&runSession{mailbox: mailbox})
|
||||
if st.Cleanup != nil {
|
||||
defer safeCleanup(st.Cleanup) // panic-isolated, like runPostRun
|
||||
}
|
||||
for _, t := range st.Tools {
|
||||
if err := toolbox.Add(t); err != nil {
|
||||
res.Err = fmt.Errorf("add session tool: %w", err)
|
||||
e.finishAudit(ctx, rec, "error", res, started, res.Err)
|
||||
return res
|
||||
}
|
||||
}
|
||||
postRun = st.PostRun
|
||||
}
|
||||
|
||||
// Run context: bound by MaxRuntime, detached from the caller's deadline so a
|
||||
// lane/queue wait doesn't eat the run budget (mort's V10 lesson). Caller
|
||||
// cancellation still propagates via MergeCancellation. Created BEFORE the
|
||||
// step observer so the observer forwards the merged run context (not a
|
||||
// possibly-cancelled caller ctx) to OnStep consumers.
|
||||
runCtx, cancel := context.WithTimeout(context.WithoutCancel(ctx), maxRuntime)
|
||||
defer cancel()
|
||||
// MaxRuntime stays a WithTimeout so its DeadlineExceeded propagates through the
|
||||
// child chain (→ "timeout"), preserving the run's-own-timeout vs caller-cancel
|
||||
// distinction. A NESTED cause-carrying layer lets a critic kill surface as a
|
||||
// distinct "killed" without disturbing that: only an ErrCriticKill cause is
|
||||
// consulted in statusFor; a generic run error or a caller cancel is classified
|
||||
// by the run error itself.
|
||||
timeoutCtx, cancelTimeout := context.WithTimeout(context.WithoutCancel(ctx), maxRuntime)
|
||||
defer cancelTimeout()
|
||||
runCtx, cancelCause := context.WithCancelCause(timeoutCtx)
|
||||
defer cancelCause(nil)
|
||||
runCtx, mergeCancel := MergeCancellation(runCtx, ctx)
|
||||
defer mergeCancel()
|
||||
|
||||
// Critic (optional): monitors the run for a stall, can nudge/extend/kill via
|
||||
// its host Escalator. Its hard deadline is bound to runCtx (cancel on pass).
|
||||
// nil-safe: no-op when no critic is configured or the agent doesn't enable it.
|
||||
critic, stopCritic := e.startCritic(runCtx, cancel, ra, info)
|
||||
critic, stopCritic := e.startCritic(runCtx, cancelCause, ra, info)
|
||||
defer stopCritic()
|
||||
|
||||
// Step instrumentation: accumulate Result.Steps + fire inv.OnStep, feed the
|
||||
@@ -222,7 +269,7 @@ func (e *Executor) Run(ctx context.Context, ra RunnableAgent, inv tool.Invocatio
|
||||
if rec != nil {
|
||||
rec.OnStep(s.Index, s.Response)
|
||||
}
|
||||
critic.recordStep(s.Index) // keep the critic's activity clock fresh
|
||||
critic.recordStep(s.Index, s.Response) // keep the critic's activity clock fresh + carry the step payload
|
||||
var calls []llm.ToolCall
|
||||
if s.Response != nil {
|
||||
calls = s.Response.ToolCalls
|
||||
@@ -271,9 +318,18 @@ func (e *Executor) Run(ctx context.Context, ra RunnableAgent, inv tool.Invocatio
|
||||
}
|
||||
|
||||
ag := agent.New(model, e.systemPrompt(ra), opts...)
|
||||
runRes, runErr := ag.Run(runCtx, input, critic.steerOptions()...)
|
||||
// Stage non-image input attachments (audio/PDF/binary) into the host file
|
||||
// store and fold an [ATTACHED FILES] descriptor into the prompt so the agent
|
||||
// can reach them by file_id. No-op when Ports.InputFiles is nil or there are
|
||||
// no files. Done after the model/toolbox build but before the loop, so the
|
||||
// descriptor rides the very first user turn.
|
||||
input = e.stageInputFiles(runCtx, inv.RunID, ra.ID, inv.InputFiles, input)
|
||||
// One WithSteer drains BOTH the session mailbox (a tool's AttachImages) and
|
||||
// the critic's nudges before each step.
|
||||
steer := func() []llm.Message { return append(mailbox.drain(), critic.drainSteer()...) }
|
||||
runRes, runErr := runAgent(runCtx, ag, input, inv.Images, agent.WithSteer(steer))
|
||||
|
||||
status := statusFor(runErr)
|
||||
status := statusFor(runCtx, runErr)
|
||||
if runRes != nil {
|
||||
res.Output = runRes.Output
|
||||
res.Usage = runRes.Usage
|
||||
@@ -281,6 +337,19 @@ func (e *Executor) Run(ctx context.Context, ra RunnableAgent, inv tool.Invocatio
|
||||
res.Steps = emitter.snapshot()
|
||||
res.Err = runErr
|
||||
|
||||
// PostRun: hand the SessionToolFactory's hook the full transcript (populated
|
||||
// even on partial results) so it can produce artifacts. Best-effort +
|
||||
// panic-isolated — a PostRun failure never fails an otherwise-successful run.
|
||||
if postRun != nil {
|
||||
var transcript []llm.Message
|
||||
if runRes != nil {
|
||||
transcript = runRes.Messages
|
||||
}
|
||||
// Detach from the caller's ctx: a finished/cancelled caller must not abort
|
||||
// artifact production (the hook owns its own bounding, per its contract).
|
||||
res.PostRunResult = runPostRun(detach(ctx), postRun, transcript, res.Output, runErr)
|
||||
}
|
||||
|
||||
e.finishAudit(ctx, rec, status, res, started, runErr)
|
||||
if e.cfg.Ports.Budget != nil {
|
||||
e.cfg.Ports.Budget.Commit(detach(ctx), inv.CallerID, time.Since(started).Seconds())
|
||||
@@ -289,13 +358,22 @@ func (e *Executor) Run(ctx context.Context, ra RunnableAgent, inv tool.Invocatio
|
||||
return res
|
||||
}
|
||||
|
||||
// statusFor maps a run error to a RunStats.Status, distinguishing a deadline
|
||||
// (timeout) and a cancellation (cancelled — caller cancel or shutdown) from a
|
||||
// generic error so audit consumers can tell them apart.
|
||||
func statusFor(runErr error) string {
|
||||
// statusFor maps a run error to a RunStats.Status, distinguishing a critic kill
|
||||
// (killed), a deadline (timeout), and a cancellation (cancelled — caller cancel
|
||||
// or shutdown) from a generic error so audit consumers can tell them apart. The
|
||||
// run context's cancellation cause carries the distinction (ErrCriticKill /
|
||||
// DeadlineExceeded), since ctx.Err() alone only reports Canceled.
|
||||
func statusFor(runCtx context.Context, runErr error) string {
|
||||
switch {
|
||||
case runErr == nil:
|
||||
return "ok"
|
||||
// Only the kill is recovered from the cancellation cause — a critic kill
|
||||
// surfaces as a plain Canceled run error, so without this it'd read as
|
||||
// "cancelled". Everything else is classified by the run error itself, so a
|
||||
// genuine run error is never relabeled just because the context was later
|
||||
// cancelled, and a caller cancel/deadline stays "cancelled" (not "timeout").
|
||||
case errors.Is(context.Cause(runCtx), ErrCriticKill):
|
||||
return "killed"
|
||||
case errors.Is(runErr, context.DeadlineExceeded):
|
||||
return "timeout"
|
||||
case errors.Is(runErr, context.Canceled):
|
||||
@@ -369,3 +447,28 @@ func detach(ctx context.Context) context.Context {
|
||||
_ = cancel // bounded by the timeout; nothing to cancel early
|
||||
return c
|
||||
}
|
||||
|
||||
// runAgent dispatches the majordomo agent loop. majordomo's Run takes a text-only
|
||||
// input arg, so when the invocation carries images they're folded into the first
|
||||
// user message (text + image parts) via WithHistory and Run is called with an
|
||||
// empty input — the model then sees a multimodal opening turn. The image-less path
|
||||
// passes the prompt straight through.
|
||||
//
|
||||
// The text part is omitted when input is blank (image-only run), matching
|
||||
// runSession.AttachImages so no empty TextPart is sent.
|
||||
func runAgent(ctx context.Context, ag *agent.Agent, input string, images []llm.ImagePart, opts ...agent.RunOption) (*agent.Result, error) {
|
||||
if len(images) == 0 {
|
||||
return ag.Run(ctx, input, opts...)
|
||||
}
|
||||
parts := make([]llm.Part, 0, len(images)+1)
|
||||
if strings.TrimSpace(input) != "" {
|
||||
parts = append(parts, llm.Text(input))
|
||||
}
|
||||
for _, img := range images {
|
||||
parts = append(parts, img)
|
||||
}
|
||||
// Copy opts before appending so a caller-supplied backing array is never
|
||||
// mutated/aliased (the variadic slice can have spare capacity).
|
||||
opts = append(opts[:len(opts):len(opts)], agent.WithHistory([]llm.Message{llm.UserParts(parts...)}))
|
||||
return ag.Run(ctx, "", opts...)
|
||||
}
|
||||
|
||||
+20
-7
@@ -148,20 +148,33 @@ func TestExecutorNilModelNoPanic(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
// TestStatusFor maps run errors to RunStats.Status (gadfly F3).
|
||||
// TestStatusFor maps run errors + cancellation cause to RunStats.Status (gadfly F3).
|
||||
func TestStatusFor(t *testing.T) {
|
||||
bg := context.Background()
|
||||
// A context cancelled with the critic-kill cause: ctx.Err() is Canceled, but
|
||||
// context.Cause carries ErrCriticKill → "killed".
|
||||
killCtx, killCancel := context.WithCancelCause(context.Background())
|
||||
killCancel(fmt.Errorf("%w: hung", ErrCriticKill))
|
||||
// A context cancelled with a non-kill cause must NOT relabel a genuine run
|
||||
// error: a real error stays "error" even though the ctx was later cancelled.
|
||||
cancelledCtx, cc := context.WithCancelCause(context.Background())
|
||||
cc(context.DeadlineExceeded)
|
||||
cases := []struct {
|
||||
ctx context.Context
|
||||
err error
|
||||
want string
|
||||
}{
|
||||
{nil, "ok"},
|
||||
{context.DeadlineExceeded, "timeout"},
|
||||
{context.Canceled, "cancelled"},
|
||||
{fmt.Errorf("wrapped: %w", context.DeadlineExceeded), "timeout"},
|
||||
{errors.New("boom"), "error"},
|
||||
{bg, nil, "ok"},
|
||||
{bg, context.DeadlineExceeded, "timeout"},
|
||||
{bg, context.Canceled, "cancelled"},
|
||||
{bg, fmt.Errorf("wrapped: %w", context.DeadlineExceeded), "timeout"},
|
||||
{bg, errors.New("boom"), "error"},
|
||||
{killCtx, context.Canceled, "killed"},
|
||||
{cancelledCtx, errors.New("boom"), "error"}, // generic error not relabeled by cause
|
||||
{cancelledCtx, context.Canceled, "cancelled"}, // caller cancel stays cancelled, not timeout
|
||||
}
|
||||
for _, c := range cases {
|
||||
if got := statusFor(c.err); got != c.want {
|
||||
if got := statusFor(c.ctx, c.err); got != c.want {
|
||||
t.Errorf("statusFor(%v) = %q, want %q", c.err, got, c.want)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -0,0 +1,121 @@
|
||||
package run_test
|
||||
|
||||
import (
|
||||
"context"
|
||||
"strings"
|
||||
"testing"
|
||||
|
||||
"gitea.stevedudenhoeffer.com/steve/majordomo/llm"
|
||||
"gitea.stevedudenhoeffer.com/steve/majordomo/provider/fake"
|
||||
|
||||
"gitea.stevedudenhoeffer.com/steve/executus/run"
|
||||
"gitea.stevedudenhoeffer.com/steve/executus/tool"
|
||||
)
|
||||
|
||||
// TestExecutorFoldsInitialImages: when the invocation carries Images, they're
|
||||
// folded into the first user message (alongside the prompt text) instead of being
|
||||
// dropped — majordomo's Run input arg is text-only, so the executor seeds the
|
||||
// multimodal opening turn via history.
|
||||
func TestExecutorFoldsInitialImages(t *testing.T) {
|
||||
fp := fake.New("fake")
|
||||
fp.Enqueue("m", fake.Reply("saw the image"))
|
||||
m, _ := fp.Model("m")
|
||||
|
||||
img := llm.ImagePart{MIME: "image/png", Data: []byte("PNGDATA")}
|
||||
inv := tool.Invocation{RunID: "r1", Images: []llm.ImagePart{img}}
|
||||
ex := run.New(run.Config{
|
||||
Registry: tool.NewRegistry(),
|
||||
Models: func(ctx context.Context, _ string) (context.Context, llm.Model, error) { return ctx, m, nil },
|
||||
})
|
||||
res := ex.Run(context.Background(), run.RunnableAgent{ModelTier: "m"}, inv, "describe this")
|
||||
if res.Err != nil {
|
||||
t.Fatalf("run error: %v", res.Err)
|
||||
}
|
||||
|
||||
calls := fp.Calls()
|
||||
if len(calls) == 0 {
|
||||
t.Fatal("no model calls recorded")
|
||||
}
|
||||
// The text + image must be CO-LOCATED in a single user message (not split
|
||||
// across two), so the model reads them as one multimodal turn.
|
||||
coLocated := false
|
||||
for _, msg := range calls[0].Request.Messages {
|
||||
sawImage, sawText := false, false
|
||||
for _, p := range msg.Parts {
|
||||
switch pp := p.(type) {
|
||||
case llm.ImagePart:
|
||||
if string(pp.Data) == "PNGDATA" {
|
||||
sawImage = true
|
||||
}
|
||||
case llm.TextPart:
|
||||
if strings.Contains(pp.Text, "describe this") {
|
||||
sawText = true
|
||||
}
|
||||
}
|
||||
}
|
||||
if sawImage && sawText {
|
||||
coLocated = true
|
||||
}
|
||||
}
|
||||
if !coLocated {
|
||||
t.Error("image + prompt text were not folded into the SAME user message")
|
||||
}
|
||||
}
|
||||
|
||||
// TestExecutorImageOnlyNoBlankText: an image-only run (blank prompt) must NOT emit
|
||||
// an empty TextPart — the message carries just the image, matching
|
||||
// runSession.AttachImages's guard.
|
||||
func TestExecutorImageOnlyNoBlankText(t *testing.T) {
|
||||
fp := fake.New("fake")
|
||||
fp.Enqueue("m", fake.Reply("saw it"))
|
||||
m, _ := fp.Model("m")
|
||||
|
||||
inv := tool.Invocation{RunID: "r3", Images: []llm.ImagePart{{MIME: "image/png", Data: []byte("IMG")}}}
|
||||
ex := run.New(run.Config{
|
||||
Registry: tool.NewRegistry(),
|
||||
Models: func(ctx context.Context, _ string) (context.Context, llm.Model, error) { return ctx, m, nil },
|
||||
})
|
||||
res := ex.Run(context.Background(), run.RunnableAgent{ModelTier: "m"}, inv, " ")
|
||||
if res.Err != nil {
|
||||
t.Fatalf("run error: %v", res.Err)
|
||||
}
|
||||
for _, msg := range fp.Calls()[0].Request.Messages {
|
||||
for _, p := range msg.Parts {
|
||||
if tp, ok := p.(llm.TextPart); ok && strings.TrimSpace(tp.Text) == "" {
|
||||
t.Error("image-only run emitted a blank TextPart")
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// TestExecutorTextOnlyUnchanged: with no Images, the prompt flows through as the
|
||||
// text input (regression guard that the fold path didn't break the common case).
|
||||
func TestExecutorTextOnlyUnchanged(t *testing.T) {
|
||||
fp := fake.New("fake")
|
||||
fp.Enqueue("m", fake.Reply("ok"))
|
||||
m, _ := fp.Model("m")
|
||||
|
||||
ex := run.New(run.Config{
|
||||
Registry: tool.NewRegistry(),
|
||||
Models: func(ctx context.Context, _ string) (context.Context, llm.Model, error) { return ctx, m, nil },
|
||||
})
|
||||
res := ex.Run(context.Background(), run.RunnableAgent{ModelTier: "m"}, tool.Invocation{RunID: "r2"}, "plain prompt")
|
||||
if res.Err != nil {
|
||||
t.Fatalf("run error: %v", res.Err)
|
||||
}
|
||||
calls := fp.Calls()
|
||||
if len(calls) == 0 {
|
||||
t.Fatal("no model calls recorded")
|
||||
}
|
||||
sawText := false
|
||||
for _, msg := range calls[0].Request.Messages {
|
||||
for _, p := range msg.Parts {
|
||||
if tp, ok := p.(llm.TextPart); ok && strings.Contains(tp.Text, "plain prompt") {
|
||||
sawText = true
|
||||
}
|
||||
}
|
||||
}
|
||||
if !sawText {
|
||||
t.Error("text-only prompt did not reach the model")
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,179 @@
|
||||
package run
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"log/slog"
|
||||
"path"
|
||||
"strings"
|
||||
"unicode"
|
||||
|
||||
"gitea.stevedudenhoeffer.com/steve/executus/tool"
|
||||
)
|
||||
|
||||
// maxInputFileBytes is a defense-in-depth cap at the staging boundary. A host's
|
||||
// extraction path may already cap downloads, but stageInputFiles is the trust
|
||||
// boundary for the InputFiles seam: a call site or bug that populates InputFiles
|
||||
// directly must not write an unbounded blob to the host file store.
|
||||
const maxInputFileBytes = 50_000_000
|
||||
|
||||
// maxInputFiles bounds how many attachments a single run stages, independent of
|
||||
// the per-file byte cap — defense-in-depth against a flood of tiny files.
|
||||
const maxInputFiles = 32
|
||||
|
||||
// stageInputFiles persists each non-image input attachment into the host file
|
||||
// store (Ports.InputFiles) under run scope and appends a descriptor block to the
|
||||
// prompt so the agent knows the file_ids it can pass to a worker tool. The bytes
|
||||
// are NOT inlined into the model context — the LLM can't read raw audio/binary —
|
||||
// so the agent reaches them via a file_id-aware tool (e.g. code_exec files_in,
|
||||
// which writes the file to /workspace/<name>).
|
||||
//
|
||||
// Best-effort: a nil stager, no files, or a per-file save error degrades to
|
||||
// "skip that file" — the run still proceeds. Returns the (possibly augmented)
|
||||
// prompt.
|
||||
func (e *Executor) stageInputFiles(ctx context.Context, runID, agentID string, files []tool.InputFile, prompt string) string {
|
||||
if e.cfg.Ports.InputFiles == nil || len(files) == 0 {
|
||||
return prompt
|
||||
}
|
||||
// Count cap: bound how many attachments one run can stage, independent of the
|
||||
// per-file byte cap (defense-in-depth against a flood of tiny files).
|
||||
if len(files) > maxInputFiles {
|
||||
slog.Warn("run: too many input files, truncating",
|
||||
"agent", agentID, "run_id", runID, "count", len(files), "cap", maxInputFiles)
|
||||
files = files[:maxInputFiles]
|
||||
}
|
||||
|
||||
type stagedFile struct {
|
||||
name, mime, fileID string
|
||||
size int
|
||||
}
|
||||
var staged []stagedFile
|
||||
seenNames := make(map[string]int, len(files))
|
||||
for _, f := range files {
|
||||
if len(f.Data) == 0 {
|
||||
slog.Warn("run: skipping empty input file",
|
||||
"agent", agentID, "run_id", runID, "name", f.Name)
|
||||
continue
|
||||
}
|
||||
if len(f.Data) > maxInputFileBytes {
|
||||
slog.Warn("run: skipping oversized input file",
|
||||
"agent", agentID, "run_id", runID, "name", f.Name,
|
||||
"size", len(f.Data), "cap", maxInputFileBytes)
|
||||
continue
|
||||
}
|
||||
// Reduce the untrusted filename to a safe base name BEFORE staging or
|
||||
// inlining: strips ../ and absolute-path components (so it can't escape
|
||||
// the host store or /workspace/<name>) and drops control chars/newlines
|
||||
// (so a crafted name can't inject text into the descriptor block below).
|
||||
// Then disambiguate colliding base names so two attachments don't both map
|
||||
// to /workspace/<name> (the second would clobber the first).
|
||||
name := uniqueName(sanitizeName(f.Name), seenNames)
|
||||
// Sanitize the mime ONCE and pass the clean value to both the host store
|
||||
// and the descriptor (don't hand the raw value to StageInputFile).
|
||||
mime := sanitizeField(f.MimeType)
|
||||
fileID, err := e.cfg.Ports.InputFiles.StageInputFile(ctx, runID, agentID, name, mime, f.Data)
|
||||
if err != nil {
|
||||
slog.Warn("run: failed to stage input file",
|
||||
"agent", agentID, "run_id", runID, "name", name, "error", err)
|
||||
continue
|
||||
}
|
||||
if fileID == "" {
|
||||
slog.Warn("run: stager returned empty file_id, skipping",
|
||||
"agent", agentID, "run_id", runID, "name", name)
|
||||
continue
|
||||
}
|
||||
// fileID is host-generated, but sanitize it too before inlining — the
|
||||
// descriptor must never carry control chars no matter the stager impl.
|
||||
staged = append(staged, stagedFile{name: name, mime: mime, fileID: sanitizeField(fileID), size: len(f.Data)})
|
||||
}
|
||||
if len(staged) == 0 {
|
||||
return prompt
|
||||
}
|
||||
|
||||
var b strings.Builder
|
||||
b.WriteString("[ATTACHED FILES]\n")
|
||||
b.WriteString("The user attached the following file(s). Their contents are NOT included in this prompt and you cannot read them directly. ")
|
||||
b.WriteString("To work with one, call the code_exec tool with a files_in entry — e.g. ")
|
||||
b.WriteString(`files_in: [{"name": "<name>", "file_id": "<file_id>"}]`)
|
||||
b.WriteString(" — which writes it to /workspace/<name> inside the Python sandbox. You may also pass a file_id to any other tool that accepts one.\n")
|
||||
for _, s := range staged {
|
||||
fmt.Fprintf(&b, "- %s (%s, %s) → file_id: %s\n", s.name, s.mime, humanizeBytes(s.size), s.fileID)
|
||||
}
|
||||
|
||||
if strings.TrimSpace(prompt) == "" {
|
||||
return b.String()
|
||||
}
|
||||
return prompt + "\n\n" + b.String()
|
||||
}
|
||||
|
||||
// sanitizeName reduces an untrusted attachment filename to a safe base name. It
|
||||
// drops control characters / newlines (which would otherwise let a crafted name
|
||||
// inject text into the [ATTACHED FILES] descriptor) and strips every directory
|
||||
// component — defeating ../ traversal, nested dirs, and absolute / drive paths
|
||||
// both in the host file store and at /workspace/<name>. Returns "attachment"
|
||||
// when nothing usable remains (empty, ".", "..").
|
||||
func sanitizeName(name string) string {
|
||||
name = sanitizeField(name)
|
||||
// Normalize backslashes so a Windows-style path also reduces to its base.
|
||||
base := path.Base(strings.ReplaceAll(name, `\`, "/"))
|
||||
base = strings.TrimSpace(base)
|
||||
if base == "" || base == "." || base == ".." {
|
||||
return "attachment"
|
||||
}
|
||||
return base
|
||||
}
|
||||
|
||||
// sanitizeField strips characters that could let a value inlined verbatim into
|
||||
// the prompt descriptor break out of its line or visually mislead: control
|
||||
// characters (IsControl covers newlines/tabs) AND Unicode format characters
|
||||
// (category Cf — e.g. the bidi overrides U+202A–U+202E, which can reorder how
|
||||
// the descriptor renders).
|
||||
func sanitizeField(s string) string {
|
||||
return strings.Map(func(r rune) rune {
|
||||
if unicode.IsControl(r) || unicode.Is(unicode.Cf, r) {
|
||||
return -1
|
||||
}
|
||||
return r
|
||||
}, s)
|
||||
}
|
||||
|
||||
// uniqueName returns name unchanged the first time it's seen, then name-2,
|
||||
// name-3, … (suffix inserted before the extension) on repeats, recording each
|
||||
// result in seen so later collisions keep counting up.
|
||||
func uniqueName(name string, seen map[string]int) string {
|
||||
if seen[name] == 0 {
|
||||
seen[name]++
|
||||
return name
|
||||
}
|
||||
ext := path.Ext(name)
|
||||
base := strings.TrimSuffix(name, ext)
|
||||
for {
|
||||
seen[name]++
|
||||
candidate := fmt.Sprintf("%s-%d%s", base, seen[name], ext)
|
||||
if seen[candidate] == 0 {
|
||||
seen[candidate]++
|
||||
return candidate
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// humanizeBytes renders a byte count as a short human-readable string (e.g.
|
||||
// "2.1 MB") for the attached-files descriptor block.
|
||||
func humanizeBytes(n int) string {
|
||||
if n < 0 {
|
||||
n = 0
|
||||
}
|
||||
const unit = 1024
|
||||
if n < unit {
|
||||
return fmt.Sprintf("%d B", n)
|
||||
}
|
||||
const prefixes = "KMGTPE"
|
||||
div, exp := int64(unit), 0
|
||||
// Clamp exp to the last prefix so an absurd size (≥1024^7) can't index past
|
||||
// "KMGTPE" and panic — a no-panic guarantee independent of the per-file cap.
|
||||
for v := int64(n) / unit; v >= unit && exp < len(prefixes)-1; v /= unit {
|
||||
div *= unit
|
||||
exp++
|
||||
}
|
||||
return fmt.Sprintf("%.1f %cB", float64(n)/float64(div), prefixes[exp])
|
||||
}
|
||||
@@ -0,0 +1,243 @@
|
||||
package run
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"strings"
|
||||
"testing"
|
||||
|
||||
"gitea.stevedudenhoeffer.com/steve/majordomo/llm"
|
||||
|
||||
"gitea.stevedudenhoeffer.com/steve/executus/tool"
|
||||
)
|
||||
|
||||
// stagerFunc is a test InputFileStager: it records each staged file and returns
|
||||
// a deterministic file_id ("file_<name>"), or an error if err is set.
|
||||
type stagerFunc struct {
|
||||
staged []stagedRec
|
||||
err error
|
||||
}
|
||||
|
||||
type stagedRec struct {
|
||||
runID, agentID, name, mime string
|
||||
size int
|
||||
}
|
||||
|
||||
func (s *stagerFunc) StageInputFile(_ context.Context, runID, agentID, name, mime string, content []byte) (string, error) {
|
||||
if s.err != nil {
|
||||
return "", s.err
|
||||
}
|
||||
s.staged = append(s.staged, stagedRec{runID, agentID, name, mime, len(content)})
|
||||
return "file_" + name, nil
|
||||
}
|
||||
|
||||
func newStagerExecutor(s InputFileStager) *Executor {
|
||||
return New(Config{
|
||||
Registry: tool.NewRegistry(),
|
||||
Models: func(ctx context.Context, _ string) (context.Context, llm.Model, error) { return ctx, nil, nil },
|
||||
Ports: Ports{InputFiles: s},
|
||||
})
|
||||
}
|
||||
|
||||
// TestStageInputFiles: files are staged via the port and an [ATTACHED FILES]
|
||||
// descriptor (with each file_id) is appended to the prompt.
|
||||
func TestStageInputFiles(t *testing.T) {
|
||||
st := &stagerFunc{}
|
||||
ex := newStagerExecutor(st)
|
||||
out := ex.stageInputFiles(context.Background(), "run-1", "agent-1",
|
||||
[]tool.InputFile{{Name: "clip.mp3", MimeType: "audio/mpeg", Data: []byte("abcd")}},
|
||||
"transcribe this")
|
||||
|
||||
if len(st.staged) != 1 || st.staged[0].name != "clip.mp3" {
|
||||
t.Fatalf("staged = %+v, want one clip.mp3", st.staged)
|
||||
}
|
||||
if st.staged[0].runID != "run-1" || st.staged[0].agentID != "agent-1" {
|
||||
t.Errorf("stager got runID/agentID = %q/%q, want run-1/agent-1", st.staged[0].runID, st.staged[0].agentID)
|
||||
}
|
||||
for _, want := range []string{"transcribe this", "[ATTACHED FILES]", "clip.mp3", "file_clip.mp3", "audio/mpeg"} {
|
||||
if !strings.Contains(out, want) {
|
||||
t.Errorf("output missing %q:\n%s", want, out)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// TestStageInputFilesNoStager: a nil port leaves the prompt untouched and never
|
||||
// drops the run.
|
||||
func TestStageInputFilesNoStager(t *testing.T) {
|
||||
ex := newStagerExecutor(nil) // Ports.InputFiles == nil
|
||||
out := ex.stageInputFiles(context.Background(), "r", "a",
|
||||
[]tool.InputFile{{Name: "x.bin", Data: []byte("z")}}, "prompt")
|
||||
if out != "prompt" {
|
||||
t.Errorf("nil stager changed the prompt: %q", out)
|
||||
}
|
||||
}
|
||||
|
||||
// TestStageInputFilesNoFiles: no attachments leaves the prompt untouched.
|
||||
func TestStageInputFilesNoFiles(t *testing.T) {
|
||||
ex := newStagerExecutor(&stagerFunc{})
|
||||
out := ex.stageInputFiles(context.Background(), "r", "a", nil, "prompt")
|
||||
if out != "prompt" {
|
||||
t.Errorf("no files changed the prompt: %q", out)
|
||||
}
|
||||
}
|
||||
|
||||
// TestStageInputFilesDedup: colliding base names are disambiguated so they don't
|
||||
// clobber each other at /workspace/<name>.
|
||||
func TestStageInputFilesDedup(t *testing.T) {
|
||||
st := &stagerFunc{}
|
||||
ex := newStagerExecutor(st)
|
||||
out := ex.stageInputFiles(context.Background(), "r", "a", []tool.InputFile{
|
||||
{Name: "a.wav", MimeType: "audio/wav", Data: []byte("1")},
|
||||
{Name: "a.wav", MimeType: "audio/wav", Data: []byte("2")},
|
||||
}, "go")
|
||||
if len(st.staged) != 2 {
|
||||
t.Fatalf("staged %d files, want 2", len(st.staged))
|
||||
}
|
||||
if st.staged[0].name != "a.wav" || st.staged[1].name != "a-2.wav" {
|
||||
t.Errorf("dedup names = %q, %q; want a.wav, a-2.wav", st.staged[0].name, st.staged[1].name)
|
||||
}
|
||||
if !strings.Contains(out, "a-2.wav") {
|
||||
t.Errorf("output missing disambiguated name:\n%s", out)
|
||||
}
|
||||
}
|
||||
|
||||
// TestStageInputFilesSkipsBad: empty + oversized files are skipped; a save error
|
||||
// drops only that file. With nothing staged, the prompt is unchanged.
|
||||
func TestStageInputFilesSkipsBad(t *testing.T) {
|
||||
// Empty data → skipped; with no good files the prompt is returned as-is.
|
||||
ex := newStagerExecutor(&stagerFunc{})
|
||||
if out := ex.stageInputFiles(context.Background(), "r", "a",
|
||||
[]tool.InputFile{{Name: "empty.bin", Data: nil}}, "p"); out != "p" {
|
||||
t.Errorf("empty file should be skipped, got %q", out)
|
||||
}
|
||||
// A stager error → that file is dropped; nothing staged → prompt unchanged.
|
||||
exErr := newStagerExecutor(&stagerFunc{err: errors.New("disk full")})
|
||||
if out := exErr.stageInputFiles(context.Background(), "r", "a",
|
||||
[]tool.InputFile{{Name: "x.bin", Data: []byte("z")}}, "p"); out != "p" {
|
||||
t.Errorf("save error should drop the file and leave the prompt, got %q", out)
|
||||
}
|
||||
}
|
||||
|
||||
// TestStageInputFilesOversize: a file past the byte cap is skipped (prompt
|
||||
// unchanged), exercising the size guard directly.
|
||||
func TestStageInputFilesOversize(t *testing.T) {
|
||||
st := &stagerFunc{}
|
||||
ex := newStagerExecutor(st)
|
||||
big := make([]byte, maxInputFileBytes+1)
|
||||
out := ex.stageInputFiles(context.Background(), "r", "a",
|
||||
[]tool.InputFile{{Name: "huge.bin", Data: big}}, "p")
|
||||
if out != "p" || len(st.staged) != 0 {
|
||||
t.Errorf("oversized file should be skipped: out=%q staged=%d", out, len(st.staged))
|
||||
}
|
||||
}
|
||||
|
||||
// TestStageInputFilesCountCap: more than maxInputFiles attachments are truncated
|
||||
// to the cap.
|
||||
func TestStageInputFilesCountCap(t *testing.T) {
|
||||
st := &stagerFunc{}
|
||||
ex := newStagerExecutor(st)
|
||||
files := make([]tool.InputFile, maxInputFiles+5)
|
||||
for i := range files {
|
||||
files[i] = tool.InputFile{Name: "f.bin", Data: []byte("x")}
|
||||
}
|
||||
ex.stageInputFiles(context.Background(), "r", "a", files, "p")
|
||||
if len(st.staged) != maxInputFiles {
|
||||
t.Errorf("count cap: staged %d, want %d", len(st.staged), maxInputFiles)
|
||||
}
|
||||
}
|
||||
|
||||
// TestSanitizeName: traversal + absolute + control-char filenames are reduced to
|
||||
// a safe base name (no path separators, no newlines), with a fallback.
|
||||
func TestSanitizeName(t *testing.T) {
|
||||
cases := map[string]string{
|
||||
"../../etc/passwd": "passwd",
|
||||
"/etc/cron.d/x": "x",
|
||||
`..\..\windows\sys`: "sys",
|
||||
"clip.mp3": "clip.mp3",
|
||||
"": "attachment",
|
||||
"..": "attachment",
|
||||
".": "attachment",
|
||||
"evil\n- injected": "evil- injected",
|
||||
"a/b/c.wav": "c.wav",
|
||||
}
|
||||
for in, want := range cases {
|
||||
if got := sanitizeName(in); got != want {
|
||||
t.Errorf("sanitizeName(%q) = %q, want %q", in, got, want)
|
||||
}
|
||||
// A sanitized name must never carry a path separator or newline.
|
||||
got := sanitizeName(in)
|
||||
if strings.ContainsAny(got, "/\\\n\r") {
|
||||
t.Errorf("sanitizeName(%q) = %q still contains a separator/newline", in, got)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// TestStageInputFilesSanitizesTraversal: a traversal filename is staged AND
|
||||
// described under its safe base name only.
|
||||
func TestStageInputFilesSanitizesTraversal(t *testing.T) {
|
||||
st := &stagerFunc{}
|
||||
ex := newStagerExecutor(st)
|
||||
out := ex.stageInputFiles(context.Background(), "r", "a",
|
||||
[]tool.InputFile{{Name: "../../../etc/passwd", MimeType: "text/plain", Data: []byte("x")}}, "go")
|
||||
if len(st.staged) != 1 || st.staged[0].name != "passwd" {
|
||||
t.Fatalf("staged name = %+v, want passwd", st.staged)
|
||||
}
|
||||
if strings.Contains(out, "..") || strings.Contains(out, "/etc/") {
|
||||
t.Errorf("descriptor leaked the traversal path:\n%s", out)
|
||||
}
|
||||
}
|
||||
|
||||
// TestSanitizeFieldStripsBidiAndControl: control chars AND Unicode format/bidi
|
||||
// overrides are removed from inlined values.
|
||||
func TestSanitizeFieldStripsBidiAndControl(t *testing.T) {
|
||||
in := "audio/mpg\n; rm -rf" // bidi override + newline
|
||||
got := sanitizeField(in)
|
||||
if strings.ContainsAny(got, "\n\r\t") || strings.ContainsRune(got, '') {
|
||||
t.Errorf("sanitizeField left control/bidi chars: %q", got)
|
||||
}
|
||||
}
|
||||
|
||||
// TestStageInputFilesSanitizesMime: a mime with a control char is cleaned in BOTH
|
||||
// the staged value and the descriptor.
|
||||
func TestStageInputFilesSanitizesMime(t *testing.T) {
|
||||
st := &stagerFunc{}
|
||||
ex := newStagerExecutor(st)
|
||||
out := ex.stageInputFiles(context.Background(), "r", "a",
|
||||
[]tool.InputFile{{Name: "c.wav", MimeType: "audio/wav\ninjected", Data: []byte("x")}}, "go")
|
||||
if len(st.staged) != 1 || strings.ContainsAny(st.staged[0].mime, "\n\r") {
|
||||
t.Errorf("mime not sanitized before staging: %+v", st.staged)
|
||||
}
|
||||
if strings.Contains(out, "\ninjected") {
|
||||
t.Errorf("descriptor carried an unsanitized mime newline:\n%s", out)
|
||||
}
|
||||
}
|
||||
|
||||
// TestStageInputFilesEmptyFileID: a stager returning an empty file_id drops the
|
||||
// file (no blank file_id in the descriptor).
|
||||
func TestStageInputFilesEmptyFileID(t *testing.T) {
|
||||
ex := newStagerExecutor(emptyIDStager{})
|
||||
out := ex.stageInputFiles(context.Background(), "r", "a",
|
||||
[]tool.InputFile{{Name: "x.bin", Data: []byte("z")}}, "p")
|
||||
if out != "p" {
|
||||
t.Errorf("empty file_id should drop the file, got %q", out)
|
||||
}
|
||||
}
|
||||
|
||||
type emptyIDStager struct{}
|
||||
|
||||
func (emptyIDStager) StageInputFile(context.Context, string, string, string, string, []byte) (string, error) {
|
||||
return "", nil
|
||||
}
|
||||
|
||||
// TestHumanizeBytesNoPanic: an absurd size clamps to the last prefix instead of
|
||||
// indexing past "KMGTPE".
|
||||
func TestHumanizeBytesNoPanic(t *testing.T) {
|
||||
defer func() {
|
||||
if r := recover(); r != nil {
|
||||
t.Fatalf("humanizeBytes panicked: %v", r)
|
||||
}
|
||||
}()
|
||||
for _, n := range []int{0, 512, 2048, 5_000_000, 1 << 62} {
|
||||
_ = humanizeBytes(n)
|
||||
}
|
||||
}
|
||||
+31
-2
@@ -2,6 +2,7 @@ package run
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"time"
|
||||
|
||||
"gitea.stevedudenhoeffer.com/steve/majordomo/llm"
|
||||
@@ -9,6 +10,12 @@ import (
|
||||
"gitea.stevedudenhoeffer.com/steve/executus/deliver"
|
||||
)
|
||||
|
||||
// ErrCriticKill is the cancellation cause the executor stamps on a run the
|
||||
// critic kills, so a critic kill surfaces as a distinct "killed" status (vs a
|
||||
// backstop "timeout" or a caller "cancelled"). A host CriticHandle signals a
|
||||
// kill via KillCause(); the executor wraps that reason with this sentinel.
|
||||
var ErrCriticKill = errors.New("run: critic killed the run")
|
||||
|
||||
// Ports are the host seams the run executor consumes. Every field is nil-safe:
|
||||
// a light host passes the zero Ports and gets a bounded, in-memory run with no
|
||||
// persistence, audit, budget, critic, delegation, or delivery — which is
|
||||
@@ -35,6 +42,20 @@ type Ports struct {
|
||||
// Delivery is where the run's output + artifacts go. nil = the caller
|
||||
// reads the Result in-process (the light-host default).
|
||||
Delivery deliver.Delivery
|
||||
// InputFiles persists non-image input attachments (audio, PDF, binary)
|
||||
// carried on Invocation.InputFiles into a host file store under run scope,
|
||||
// returning file_ids the agent can hand to a worker tool. nil = input files
|
||||
// are silently ignored (the run still proceeds, text-only). The bytes are
|
||||
// never inlined into the model context — the LLM can't read raw audio/binary.
|
||||
InputFiles InputFileStager
|
||||
}
|
||||
|
||||
// InputFileStager persists a single non-image input attachment into a host file
|
||||
// store under run scope and returns a file_id the run can reference. It is the
|
||||
// seam mort's skill FileStorage (and any host blob store) implements so the
|
||||
// kernel can stage Invocation.InputFiles without importing a storage layer.
|
||||
type InputFileStager interface {
|
||||
StageInputFile(ctx context.Context, runID, agentID, name, mime string, content []byte) (fileID string, err error)
|
||||
}
|
||||
|
||||
// RunInfo describes a run at start time — the attribution a recorder/critic
|
||||
@@ -123,8 +144,10 @@ type Critic interface {
|
||||
// methods (the critic battery's handle guards its state with a mutex).
|
||||
type CriticHandle interface {
|
||||
// RecordStep / RecordToolStart keep the critic's activity clock fresh so a
|
||||
// healthy-but-slow run is not mistaken for a hang.
|
||||
RecordStep(iter int)
|
||||
// healthy-but-slow run is not mistaken for a hang. RecordStep also carries the
|
||||
// completed step's model response (nil-safe) so the critic's Trace can show
|
||||
// what the agent actually produced, not just an iteration count.
|
||||
RecordStep(iter int, resp *llm.Response)
|
||||
RecordToolStart(name, args string)
|
||||
// Steer returns any messages the critic wants injected into the loop (a
|
||||
// nudge), drained before each step — matches majordomo agent.WithSteer.
|
||||
@@ -137,6 +160,12 @@ type CriticHandle interface {
|
||||
// healthy-but-long run's iteration budget mid-flight. Return <= 0 to defer to
|
||||
// the run's base MaxIterations.
|
||||
MaxSteps() int
|
||||
// KillCause returns a non-nil reason iff the critic has decided to KILL this
|
||||
// run (as opposed to letting the hard-deadline backstop expire). The executor
|
||||
// reads it when the deadline passes: non-nil → cancel the run with
|
||||
// ErrCriticKill (status "killed"); nil → the backstop expired naturally
|
||||
// (status "timeout"). Hosts that never distinguish the two may return nil.
|
||||
KillCause() error
|
||||
// Stop ends monitoring when the run finishes.
|
||||
Stop()
|
||||
}
|
||||
|
||||
@@ -0,0 +1,88 @@
|
||||
package run
|
||||
|
||||
import (
|
||||
"context"
|
||||
"log/slog"
|
||||
"strings"
|
||||
"sync"
|
||||
|
||||
"gitea.stevedudenhoeffer.com/steve/majordomo/llm"
|
||||
|
||||
"gitea.stevedudenhoeffer.com/steve/executus/tool"
|
||||
)
|
||||
|
||||
// runPostRun invokes a SessionToolFactory's PostRun hook with panic isolation:
|
||||
// a PostRun panic (or a slow artifact build that the hook mishandles) must not
|
||||
// fail an otherwise-successful run — artifacts are best-effort, the agent's text
|
||||
// output is the source of truth.
|
||||
func runPostRun(ctx context.Context,
|
||||
hook func(context.Context, []llm.Message, string, error) *tool.PostRunResult,
|
||||
transcript []llm.Message, output string, runErr error) (prr *tool.PostRunResult) {
|
||||
defer func() {
|
||||
if r := recover(); r != nil {
|
||||
slog.Error("run: PostRun hook panicked; no artifacts produced", "panic", r)
|
||||
prr = nil
|
||||
}
|
||||
}()
|
||||
return hook(ctx, transcript, output, runErr)
|
||||
}
|
||||
|
||||
// steerMailbox is a thread-safe queue of messages a session tool (via
|
||||
// tool.Invocation.AttachImages) wants injected into the agent loop before its
|
||||
// next step — the same WithSteer mechanism the critic uses for nudges, exposed
|
||||
// to ordinary tools so they can show the model content (e.g. a rendered
|
||||
// preview) it must SEE, not just be told about.
|
||||
type steerMailbox struct {
|
||||
mu sync.Mutex
|
||||
msgs []llm.Message
|
||||
}
|
||||
|
||||
func (m *steerMailbox) push(msg llm.Message) {
|
||||
m.mu.Lock()
|
||||
m.msgs = append(m.msgs, msg)
|
||||
m.mu.Unlock()
|
||||
}
|
||||
|
||||
// drain returns and clears the queued messages (nil when empty).
|
||||
func (m *steerMailbox) drain() []llm.Message {
|
||||
m.mu.Lock()
|
||||
defer m.mu.Unlock()
|
||||
if len(m.msgs) == 0 {
|
||||
return nil
|
||||
}
|
||||
out := m.msgs
|
||||
m.msgs = nil
|
||||
return out
|
||||
}
|
||||
|
||||
// runSession implements tool.AgentSession over a steer mailbox: AttachImages
|
||||
// queues a user-role multimodal message the agent loop injects before its next
|
||||
// step. Replaces legacy agentkit's Agent.AttachImages — majordomo's *agent.Agent
|
||||
// is immutable mid-run, so mutation flows through the run-scoped steer mailbox.
|
||||
type runSession struct{ mailbox *steerMailbox }
|
||||
|
||||
func (s *runSession) AttachImages(text string, images ...llm.ImagePart) {
|
||||
parts := make([]llm.Part, 0, len(images)+1)
|
||||
if strings.TrimSpace(text) != "" {
|
||||
parts = append(parts, llm.Text(text))
|
||||
}
|
||||
for _, img := range images {
|
||||
parts = append(parts, img)
|
||||
}
|
||||
if len(parts) == 0 {
|
||||
return
|
||||
}
|
||||
s.mailbox.push(llm.UserParts(parts...))
|
||||
}
|
||||
|
||||
// safeCleanup runs a SessionTools.Cleanup with panic isolation, so a misbehaving
|
||||
// teardown (temp-dir removal, handle close) can't clobber an otherwise-successful
|
||||
// run via the executor's top-level recover.
|
||||
func safeCleanup(fn func()) {
|
||||
defer func() {
|
||||
if r := recover(); r != nil {
|
||||
slog.Error("run: session Cleanup panicked", "panic", r)
|
||||
}
|
||||
}()
|
||||
fn()
|
||||
}
|
||||
@@ -0,0 +1,94 @@
|
||||
package run_test
|
||||
|
||||
import (
|
||||
"context"
|
||||
"testing"
|
||||
|
||||
"gitea.stevedudenhoeffer.com/steve/majordomo/llm"
|
||||
"gitea.stevedudenhoeffer.com/steve/majordomo/provider/fake"
|
||||
|
||||
"gitea.stevedudenhoeffer.com/steve/executus/run"
|
||||
"gitea.stevedudenhoeffer.com/steve/executus/tool"
|
||||
)
|
||||
|
||||
// TestSessionToolFactoryPostRun: a SessionToolFactory's PostRun hook produces an
|
||||
// artifact (from the run output + transcript) that lands on Result.PostRunResult,
|
||||
// and its Cleanup is deferred.
|
||||
func TestSessionToolFactoryPostRun(t *testing.T) {
|
||||
fp := fake.New("fake")
|
||||
fp.Enqueue("m", fake.Reply("hello artifacts"))
|
||||
m, _ := fp.Model("m")
|
||||
|
||||
cleanupCalled := false
|
||||
inv := tool.Invocation{
|
||||
RunID: "r1",
|
||||
SessionToolFactory: func(_ tool.AgentSession) tool.SessionTools {
|
||||
return tool.SessionTools{
|
||||
PostRun: func(_ context.Context, transcript []llm.Message, output string, _ error) *tool.PostRunResult {
|
||||
return &tool.PostRunResult{
|
||||
Artifacts: []tool.Artifact{{Name: "out.txt", MimeType: "text/plain", Data: []byte(output)}},
|
||||
Metadata: map[string]any{"transcript_len": len(transcript)},
|
||||
}
|
||||
},
|
||||
Cleanup: func() { cleanupCalled = true },
|
||||
}
|
||||
},
|
||||
}
|
||||
ex := run.New(run.Config{
|
||||
Registry: tool.NewRegistry(),
|
||||
Models: func(ctx context.Context, _ string) (context.Context, llm.Model, error) { return ctx, m, nil },
|
||||
})
|
||||
res := ex.Run(context.Background(), run.RunnableAgent{ModelTier: "m"}, inv, "go")
|
||||
if res.Err != nil {
|
||||
t.Fatalf("run error: %v", res.Err)
|
||||
}
|
||||
if res.PostRunResult == nil {
|
||||
t.Fatal("Result.PostRunResult is nil — PostRun hook not invoked / not attached")
|
||||
}
|
||||
if n := len(res.PostRunResult.Artifacts); n != 1 {
|
||||
t.Fatalf("artifacts = %d, want 1", n)
|
||||
}
|
||||
a := res.PostRunResult.Artifacts[0]
|
||||
if a.Name != "out.txt" || string(a.Data) != "hello artifacts" {
|
||||
t.Errorf("artifact = {%q, %q}", a.Name, string(a.Data))
|
||||
}
|
||||
if tl, _ := res.PostRunResult.Metadata["transcript_len"].(int); tl < 1 {
|
||||
t.Errorf("transcript not passed to PostRun (len=%d)", tl)
|
||||
}
|
||||
if !cleanupCalled {
|
||||
t.Error("Cleanup was not deferred/called")
|
||||
}
|
||||
}
|
||||
|
||||
// TestSessionToolFactoryAddsTool: tools the factory returns join the run's
|
||||
// toolbox and are callable by the model.
|
||||
func TestSessionToolFactoryAddsTool(t *testing.T) {
|
||||
fp := fake.New("fake")
|
||||
fp.Enqueue("m",
|
||||
fake.ReplyWith(llm.Response{ToolCalls: []llm.ToolCall{{ID: "c1", Name: "render", Arguments: []byte(`{}`)}}}),
|
||||
fake.Reply("rendered"),
|
||||
)
|
||||
m, _ := fp.Model("m")
|
||||
|
||||
toolCalled := false
|
||||
renderTool := llm.DefineTool("render", "render a preview",
|
||||
func(_ context.Context, _ struct{}) (any, error) { toolCalled = true; return "ok", nil })
|
||||
inv := tool.Invocation{
|
||||
RunID: "r2",
|
||||
SessionToolFactory: func(_ tool.AgentSession) tool.SessionTools {
|
||||
return tool.SessionTools{Tools: []llm.Tool{renderTool}}
|
||||
},
|
||||
}
|
||||
ex := run.New(run.Config{
|
||||
Registry: tool.NewRegistry(),
|
||||
Models: func(ctx context.Context, _ string) (context.Context, llm.Model, error) { return ctx, m, nil },
|
||||
})
|
||||
res := ex.Run(context.Background(),
|
||||
run.RunnableAgent{ModelTier: "m", MaxIterations: 5}, inv, "go")
|
||||
if res.Err != nil {
|
||||
t.Fatalf("run error: %v", res.Err)
|
||||
}
|
||||
if !toolCalled {
|
||||
t.Error("session-factory tool was not added to the toolbox / not called")
|
||||
}
|
||||
}
|
||||
+4
-3
@@ -154,9 +154,10 @@ type ContinuationContext struct {
|
||||
|
||||
// InputFile is a non-image file the user supplied with a run (audio,
|
||||
// etc.). The executor stages it into the file store under run scope and
|
||||
// surfaces its file_id to the agent. Name is a safe base name (no path
|
||||
// separators) suitable for /workspace/<name>; MimeType is the resolved
|
||||
// content type; Data is the raw bytes.
|
||||
// surfaces its file_id to the agent. Name may be an untrusted attachment
|
||||
// filename — the executor reduces it to a safe base name (stripping path
|
||||
// separators + control chars) before staging or exposing it as
|
||||
// /workspace/<name>; MimeType is the resolved content type; Data is the raw bytes.
|
||||
type InputFile struct {
|
||||
Name string
|
||||
MimeType string
|
||||
|
||||
Reference in New Issue
Block a user