Compare commits
24 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| 0acaa8c9a5 | |||
| a35c176b42 | |||
| 1cf46c9954 | |||
| 56baac758d | |||
| 5779035722 | |||
| 1a2a2364ec | |||
| c08ce47fa6 | |||
| 784d5d7ce4 | |||
| 4e179259de | |||
| 82a816ae29 | |||
| be4bbbcad5 | |||
| 390e6cf905 | |||
| 1a1d5e417b | |||
| f3bd43b726 | |||
| 306d575c31 | |||
| 4ba83ab905 | |||
| a103cc5e9f | |||
| 4d28cd6e2c | |||
| dcaefff756 | |||
| 97154395e6 | |||
| 4aa06f652e | |||
| 43b2471737 | |||
| 0c80679719 | |||
| 9d41987b0e |
@@ -1,11 +1,8 @@
|
|||||||
# Gadfly — agentic adversarial PR reviewer (https://gitea.stevedudenhoeffer.com/steve/gadfly).
|
# Gadfly adversarial review — subscribes to steve/gadfly's reusable workflow and
|
||||||
#
|
# INHERITS its default swarm. This stub holds only the triggers, the actor gate,
|
||||||
# Runs the published Gadfly image (pinned to an immutable :sha- tag — act_runner
|
# secret forwarding, and the allow-list; the swarm config (models, lenses,
|
||||||
# caches :latest, and this build is what carries foreman provider-type support)
|
# concurrency, timeouts) lives centrally in gadfly's review-reusable.yml so it is
|
||||||
# as a specialist swarm and posts
|
# tuned in ONE place. Advisory only — never blocks a merge.
|
||||||
# ONE consolidated review comment as gitea-actions. Advisory only — never blocks a
|
|
||||||
# merge. This reviews executus PRs with 3 ollama-cloud models (3-lens suite). Gadfly
|
|
||||||
# is a simple system — findings are advisory; always double-check before acting.
|
|
||||||
|
|
||||||
name: Adversarial Review (Gadfly)
|
name: Adversarial Review (Gadfly)
|
||||||
|
|
||||||
@@ -32,54 +29,26 @@ concurrency:
|
|||||||
jobs:
|
jobs:
|
||||||
review:
|
review:
|
||||||
# Security: only trusted users may trigger a secret-bearing run via a PR
|
# Security: only trusted users may trigger a secret-bearing run via a PR
|
||||||
# comment (pull_request + workflow_dispatch are already trusted). Mirrors
|
# comment (pull_request + workflow_dispatch are already trusted). Mirrors the
|
||||||
# GADFLY_ALLOWED_USERS, the in-container belt-and-suspenders check.
|
# allowed_users input below (the in-container belt-and-suspenders check) — both
|
||||||
|
# lists must stay in sync; a workflow if: can't read a workflow_call input.
|
||||||
if: >-
|
if: >-
|
||||||
github.event_name != 'issue_comment'
|
github.event_name != 'issue_comment'
|
||||||
|| (github.event.issue.pull_request
|
|| (github.event.issue.pull_request
|
||||||
&& (github.actor == 'steve'
|
&& (github.actor == 'steve'
|
||||||
|| github.actor == 'fizi'
|
|| github.actor == 'fizi'
|
||||||
|| github.actor == 'dazed'))
|
|| github.actor == 'dazed'))
|
||||||
runs-on: ubuntu-latest
|
# Tracks gadfly's v1 release tag — a curated pointer re-moved on each release
|
||||||
# 3 cloud models, all concurrent, 3-lens suite. ~12 min typical.
|
# (unlike @main, which moves on every push). Central swarm tuning propagates
|
||||||
timeout-minutes: 30
|
# here automatically; the tradeoff vs a full sha pin is that v1 is mutable.
|
||||||
steps:
|
uses: steve/gadfly/.gitea/workflows/review-reusable.yml@v1
|
||||||
- uses: docker://gitea.stevedudenhoeffer.com/steve/gadfly:sha-d7f364d
|
# Least privilege: forward only the review secrets (not `secrets: inherit`,
|
||||||
env:
|
# which would expose every repo secret). GITEA_TOKEN is the automatic token.
|
||||||
GITEA_API: ${{ github.server_url }}/api/v1/repos/${{ github.repository }}
|
secrets:
|
||||||
GITEA_TOKEN: ${{ secrets.GITEA_TOKEN }}
|
OLLAMA_CLOUD_API_KEY: ${{ secrets.OLLAMA_CLOUD_API_KEY }}
|
||||||
OLLAMA_CLOUD_API_KEY: ${{ secrets.OLLAMA_CLOUD_API_KEY }}
|
CLAUDE_CODE_OAUTH_TOKEN: ${{ secrets.CLAUDE_CODE_OAUTH_TOKEN }}
|
||||||
# executus uses CLOUD MODELS ONLY. The local Macs (m1/m5) were dropped:
|
GADFLY_FINDINGS_URL: ${{ secrets.GADFLY_FINDINGS_URL }}
|
||||||
# on a P2-review measurement they took 26–29 min (with lens timeouts)
|
GADFLY_FINDINGS_TOKEN: ${{ secrets.GADFLY_FINDINGS_TOKEN }}
|
||||||
# and contributed ZERO real findings — the two cloud models found every
|
with:
|
||||||
# genuine bug in 6–12 min. Cloud-only is faster AND higher-signal.
|
# Consumer-specific allow-list; everything else is inherited.
|
||||||
# 3 cloud models. Concurrency now lives in the LENSES, not the models:
|
allowed_users: "steve,fizi,dazed"
|
||||||
# one model runs at a time (PROVIDER_CONCURRENCY=1) with its 3 lenses
|
|
||||||
# concurrent (PROVIDER_LENS_CONCURRENCY=3). So the first model's
|
|
||||||
# comment lands sooner and each model finishes a bit faster, at the
|
|
||||||
# cost of the other two models' comments arriving in series after it.
|
|
||||||
GADFLY_MODELS: "minimax-m3:cloud,deepseek-v4-flash:cloud,glm-5.2:cloud"
|
|
||||||
GADFLY_PROVIDER_CONCURRENCY: "ollama-cloud=1"
|
|
||||||
GADFLY_PROVIDER_LENS_CONCURRENCY: "ollama-cloud=3"
|
|
||||||
# Default => the 3-lens suite (security, correctness, error-handling).
|
|
||||||
# Set the repo var GADFLY_SPECIALISTS to override (csv / "all" / "auto").
|
|
||||||
GADFLY_SPECIALISTS: ${{ vars.GADFLY_SPECIALISTS || 'security,correctness,error-handling' }}
|
|
||||||
# Per-lens deadline + bounded steps so the slow local models stay sane.
|
|
||||||
GADFLY_TIMEOUT_SECS: "600"
|
|
||||||
GADFLY_MAX_STEPS: "14"
|
|
||||||
# Allow-list for the comment trigger (mirrors the job-level if: guard).
|
|
||||||
GADFLY_ALLOWED_USERS: "steve,fizi,dazed"
|
|
||||||
# --- findings telemetry: POST runs + findings to the gadfly-reports store ---
|
|
||||||
# Advisory & off unless GADFLY_FINDINGS_URL is set; failures only log to
|
|
||||||
# stderr and never affect the review. GADFLY_REPO / GADFLY_PR are derived
|
|
||||||
# in-container; the URL + token are user-scope secrets.
|
|
||||||
GADFLY_FINDINGS_URL: ${{ secrets.GADFLY_FINDINGS_URL }}
|
|
||||||
GADFLY_FINDINGS_TOKEN: ${{ secrets.GADFLY_FINDINGS_TOKEN }}
|
|
||||||
# --- event context (leave as-is) ---
|
|
||||||
EVENT_NAME: ${{ github.event_name }}
|
|
||||||
PR: ${{ github.event.pull_request.number || github.event.issue.number || github.event.inputs.pr_number }}
|
|
||||||
PR_BRANCH: ${{ github.head_ref }}
|
|
||||||
IS_DRAFT: ${{ github.event.pull_request.draft }}
|
|
||||||
COMMENT_BODY: ${{ github.event.comment.body }}
|
|
||||||
COMMENT_ID: ${{ github.event.comment.id }}
|
|
||||||
ACTOR: ${{ github.actor }}
|
|
||||||
|
|||||||
@@ -47,9 +47,10 @@ CORE (majordomo + stdlib):
|
|||||||
toolbox + majordomo loop + compaction +
|
toolbox + majordomo loop + compaction +
|
||||||
run-bounding (V10 detached timeout) + step/
|
run-bounding (V10 detached timeout) + step/
|
||||||
audit observers + Budget gate; RunnableAgent
|
audit observers + Budget gate; RunnableAgent
|
||||||
DTO + nil-safe run.Ports. Follow-ups: wire
|
DTO + nil-safe run.Ports. Palette delegation +
|
||||||
Critic/Checkpointer/PaletteSource/Delivery,
|
Critic (monitor/deadline/steer) + Delivery
|
||||||
Phases, and the no-tools direct path [P2]
|
WIRED. Follow-ups: Checkpointer (needs a
|
||||||
|
majordomo msg-history hook), Phases [C0c]
|
||||||
dispatchguard/ loop/depth/fan-out caps [P0 ✓]
|
dispatchguard/ loop/depth/fan-out caps [P0 ✓]
|
||||||
pendingattach/ attachment dedupe [P0 ✓]
|
pendingattach/ attachment dedupe [P0 ✓]
|
||||||
tool/ registry + 3-stage permissions + ssrf [P1 ✓]
|
tool/ registry + 3-stage permissions + ssrf [P1 ✓]
|
||||||
|
|||||||
+46
-10
@@ -10,13 +10,17 @@
|
|||||||
// Mort plugs its LLM critic-agent in as an Escalator; ExtendOnce is the
|
// Mort plugs its LLM critic-agent in as an Escalator; ExtendOnce is the
|
||||||
// zero-dependency default.
|
// zero-dependency default.
|
||||||
//
|
//
|
||||||
// NOTE: the executor's call into run.Ports.Critic is a P2 follow-up; this
|
// The executor wires run.Ports.Critic (C0b): it feeds the handle activity,
|
||||||
// battery provides the seam + impl ahead of that wiring.
|
// binds the run context to its extendable Deadline, drains its Steer, and polls
|
||||||
|
// MaxSteps each step so an Escalator can also raise a long run's step ceiling
|
||||||
|
// (Decision.RaiseStepsBy).
|
||||||
package critic
|
package critic
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
|
"errors"
|
||||||
"log/slog"
|
"log/slog"
|
||||||
|
"math"
|
||||||
"sync"
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
@@ -36,10 +40,11 @@ type Progress struct {
|
|||||||
// Decision is the Escalator's verdict for a stalled run. Zero value = do
|
// Decision is the Escalator's verdict for a stalled run. Zero value = do
|
||||||
// nothing (let the hard backstop eventually kill a truly hung run).
|
// nothing (let the hard backstop eventually kill a truly hung run).
|
||||||
type Decision struct {
|
type Decision struct {
|
||||||
Nudge []llm.Message // injected before the agent's next turn (a steer)
|
Nudge []llm.Message // injected before the agent's next turn (a steer)
|
||||||
ExtendBy time.Duration // push the hard deadline out by this much
|
ExtendBy time.Duration // push the hard deadline out by this much
|
||||||
Kill bool // cancel the run now
|
RaiseStepsBy int // raise the run's tool-dispatch step ceiling by this
|
||||||
KillReason string
|
Kill bool // cancel the run now
|
||||||
|
KillReason string
|
||||||
}
|
}
|
||||||
|
|
||||||
// Escalator decides what to do when a run crosses its soft timeout. It is
|
// Escalator decides what to do when a run crosses its soft timeout. It is
|
||||||
@@ -136,6 +141,7 @@ func (s *System) Monitor(ctx context.Context, info run.RunInfo, softTimeout time
|
|||||||
now: s.now,
|
now: s.now,
|
||||||
lastActivity: now,
|
lastActivity: now,
|
||||||
deadline: now.Add(time.Duration(float64(softTimeout) * s.backstopMul)),
|
deadline: now.Add(time.Duration(float64(softTimeout) * s.backstopMul)),
|
||||||
|
maxSteps: info.MaxIterations, // base ceiling; an Escalator may RaiseStepsBy
|
||||||
stopCh: make(chan struct{}),
|
stopCh: make(chan struct{}),
|
||||||
}
|
}
|
||||||
go h.watch(ctx, check)
|
go h.watch(ctx, check)
|
||||||
@@ -155,13 +161,17 @@ type handle struct {
|
|||||||
deadline time.Time
|
deadline time.Time
|
||||||
steer []llm.Message
|
steer []llm.Message
|
||||||
iterations int
|
iterations int
|
||||||
|
maxSteps int // current tool-dispatch ceiling (base MaxIterations, raised by RaiseStepsBy)
|
||||||
lastTool string
|
lastTool string
|
||||||
killed bool // sticky: once an Escalator kills, no later decision un-kills it
|
killed bool // sticky: once an Escalator kills, no later decision un-kills it
|
||||||
|
killCause error // non-nil once killed; surfaced via KillCause for "killed" status
|
||||||
stopped bool
|
stopped bool
|
||||||
stopCh chan struct{}
|
stopCh chan struct{}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (h *handle) RecordStep(iter int) {
|
func (h *handle) RecordStep(iter int, _ *llm.Response) {
|
||||||
|
// This battery's Progress tracks iteration count + activity, not per-step
|
||||||
|
// payload, so the response is unused here; a richer Escalator could record it.
|
||||||
h.mu.Lock()
|
h.mu.Lock()
|
||||||
h.iterations = iter
|
h.iterations = iter
|
||||||
h.lastActivity = h.now()
|
h.lastActivity = h.now()
|
||||||
@@ -192,6 +202,18 @@ func (h *handle) Deadline() time.Time {
|
|||||||
return h.deadline
|
return h.deadline
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (h *handle) MaxSteps() int {
|
||||||
|
h.mu.Lock()
|
||||||
|
defer h.mu.Unlock()
|
||||||
|
return h.maxSteps
|
||||||
|
}
|
||||||
|
|
||||||
|
func (h *handle) KillCause() error {
|
||||||
|
h.mu.Lock()
|
||||||
|
defer h.mu.Unlock()
|
||||||
|
return h.killCause
|
||||||
|
}
|
||||||
|
|
||||||
func (h *handle) Stop() {
|
func (h *handle) Stop() {
|
||||||
h.mu.Lock()
|
h.mu.Lock()
|
||||||
if !h.stopped {
|
if !h.stopped {
|
||||||
@@ -254,8 +276,13 @@ func (h *handle) tick(ctx context.Context) {
|
|||||||
}
|
}
|
||||||
if d.Kill {
|
if d.Kill {
|
||||||
h.killed = true
|
h.killed = true
|
||||||
h.deadline = h.now() // immediate hard deadline → executor cancels
|
reason := d.KillReason
|
||||||
return // ignore any Nudge/ExtendBy paired with a Kill
|
if reason == "" {
|
||||||
|
reason = "critic killed the run"
|
||||||
|
}
|
||||||
|
h.killCause = errors.New(reason) // surfaced via KillCause → "killed" status
|
||||||
|
h.deadline = h.now() // immediate hard deadline → executor cancels
|
||||||
|
return // ignore any Nudge/ExtendBy paired with a Kill
|
||||||
}
|
}
|
||||||
if len(d.Nudge) > 0 {
|
if len(d.Nudge) > 0 {
|
||||||
h.steer = append(h.steer, d.Nudge...)
|
h.steer = append(h.steer, d.Nudge...)
|
||||||
@@ -263,4 +290,13 @@ func (h *handle) tick(ctx context.Context) {
|
|||||||
if d.ExtendBy > 0 {
|
if d.ExtendBy > 0 {
|
||||||
h.deadline = h.deadline.Add(d.ExtendBy)
|
h.deadline = h.deadline.Add(d.ExtendBy)
|
||||||
}
|
}
|
||||||
|
if d.RaiseStepsBy > 0 {
|
||||||
|
// Overflow-safe: a buggy Escalator returning a huge delta must not wrap
|
||||||
|
// maxSteps negative (which the executor would read as "defer to base").
|
||||||
|
if d.RaiseStepsBy > math.MaxInt-h.maxSteps {
|
||||||
|
h.maxSteps = math.MaxInt
|
||||||
|
} else {
|
||||||
|
h.maxSteps += d.RaiseStepsBy
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -51,7 +51,7 @@ func TestMonitorEscalatesOncePerIdlePeriodAndExtends(t *testing.T) {
|
|||||||
t.Error("deadline should have been extended past the original")
|
t.Error("deadline should have been extended past the original")
|
||||||
}
|
}
|
||||||
// A fresh step re-arms; another idle period escalates again.
|
// A fresh step re-arms; another idle period escalates again.
|
||||||
h.RecordStep(1)
|
h.RecordStep(1, nil)
|
||||||
time.Sleep(60 * time.Millisecond)
|
time.Sleep(60 * time.Millisecond)
|
||||||
mu.Lock()
|
mu.Lock()
|
||||||
c2 := calls
|
c2 := calls
|
||||||
|
|||||||
@@ -125,6 +125,7 @@ google.golang.org/protobuf v1.23.1-0.20200526195155-81db48ad09cc/go.mod h1:EGpAD
|
|||||||
google.golang.org/protobuf v1.25.0/go.mod h1:9JNX74DMeImyA3h4bdi1ymwjUzf21/xIlbajtzgsN7c=
|
google.golang.org/protobuf v1.25.0/go.mod h1:9JNX74DMeImyA3h4bdi1ymwjUzf21/xIlbajtzgsN7c=
|
||||||
google.golang.org/protobuf v1.34.2 h1:6xV6lTsCfpGD21XK49h7MhtcApnLqkfYgPcdHftf6hg=
|
google.golang.org/protobuf v1.34.2 h1:6xV6lTsCfpGD21XK49h7MhtcApnLqkfYgPcdHftf6hg=
|
||||||
google.golang.org/protobuf v1.34.2/go.mod h1:qYOHts0dSfpeUzUFpOMr/WGzszTmLH+DiWniOlNbLDw=
|
google.golang.org/protobuf v1.34.2/go.mod h1:qYOHts0dSfpeUzUFpOMr/WGzszTmLH+DiWniOlNbLDw=
|
||||||
|
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM=
|
||||||
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
|
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
|
||||||
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
|
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
|
||||||
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
|
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
|
||||||
|
|||||||
+124
@@ -0,0 +1,124 @@
|
|||||||
|
package run
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"fmt"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"gitea.stevedudenhoeffer.com/steve/majordomo/agent"
|
||||||
|
"gitea.stevedudenhoeffer.com/steve/majordomo/llm"
|
||||||
|
)
|
||||||
|
|
||||||
|
// criticDeadlineCheck is how often the deadline-watch goroutine polls the
|
||||||
|
// critic's hard deadline. Small relative to any realistic soft timeout.
|
||||||
|
const criticDeadlineCheck = time.Second
|
||||||
|
|
||||||
|
// criticBinding wires a CriticHandle into a run: the executor forwards activity
|
||||||
|
// (steps + tool starts) to it, binds the run's hard cancellation to the critic's
|
||||||
|
// extendable deadline, and exposes the critic's Steer messages as an agent
|
||||||
|
// RunOption. All methods are nil-safe so the executor can call them
|
||||||
|
// unconditionally when no critic is configured.
|
||||||
|
type criticBinding struct {
|
||||||
|
h CriticHandle
|
||||||
|
}
|
||||||
|
|
||||||
|
// startCritic begins critic monitoring for this run when one is configured and
|
||||||
|
// the agent enables it. It launches a goroutine that cancels runCtx (via
|
||||||
|
// cancelCause) the moment the critic's hard deadline passes — the critic may
|
||||||
|
// extend that deadline, so a healthy-but-slow run is given room while a hung one
|
||||||
|
// is killed. When the deadline passes because the critic KILLED the run
|
||||||
|
// (KillCause() != nil), the cancellation cause is ErrCriticKill (→ status
|
||||||
|
// "killed"); when the backstop simply expired, it is context.DeadlineExceeded (→
|
||||||
|
// "timeout"). Returns (nil, no-op stop) when there is no critic. The caller MUST
|
||||||
|
// defer the returned stop.
|
||||||
|
func (e *Executor) startCritic(runCtx context.Context, cancelCause context.CancelCauseFunc, ra RunnableAgent, info RunInfo) (*criticBinding, func()) {
|
||||||
|
noop := func() {}
|
||||||
|
if e.cfg.Ports.Critic == nil || !ra.Critic.Enabled {
|
||||||
|
return nil, noop
|
||||||
|
}
|
||||||
|
soft := e.cfg.Defaults.CriticSoftTimeout
|
||||||
|
if soft <= 0 {
|
||||||
|
soft = 90 * time.Second // defensive: withFallbacks normally guarantees >0
|
||||||
|
}
|
||||||
|
h := e.cfg.Ports.Critic.Monitor(runCtx, info, soft)
|
||||||
|
if h == nil {
|
||||||
|
return nil, noop
|
||||||
|
}
|
||||||
|
done := make(chan struct{})
|
||||||
|
go func() {
|
||||||
|
// A host CriticHandle.Deadline() that panics must not crash the process
|
||||||
|
// (this runs on its own goroutine, so the executor's top-level recover
|
||||||
|
// can't catch it). Log-free best-effort: just stop watching.
|
||||||
|
defer func() { _ = recover() }()
|
||||||
|
t := time.NewTicker(criticDeadlineCheck)
|
||||||
|
defer t.Stop()
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
case <-done:
|
||||||
|
return
|
||||||
|
case <-runCtx.Done():
|
||||||
|
return
|
||||||
|
case <-t.C:
|
||||||
|
// A zero deadline = no hard cap (not yet set); otherwise cancel
|
||||||
|
// once we're at or past it, distinguishing an explicit kill from a
|
||||||
|
// natural backstop expiry so the run gets the right status.
|
||||||
|
if d := h.Deadline(); !d.IsZero() && !time.Now().Before(d) {
|
||||||
|
if cause := h.KillCause(); cause != nil {
|
||||||
|
cancelCause(fmt.Errorf("%w: %s", ErrCriticKill, cause.Error()))
|
||||||
|
} else {
|
||||||
|
cancelCause(context.DeadlineExceeded)
|
||||||
|
}
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
return &criticBinding{h: h}, func() {
|
||||||
|
close(done)
|
||||||
|
h.Stop()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (b *criticBinding) recordStep(iter int, resp *llm.Response) {
|
||||||
|
if b != nil {
|
||||||
|
b.h.RecordStep(iter, resp)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// recordToolStart forwards a tool call to the critic. NOTE: majordomo's step
|
||||||
|
// observer only fires AFTER an iteration completes, so this currently lands
|
||||||
|
// post-tool, not at dispatch — the activity clock is refreshed once per
|
||||||
|
// iteration, not mid-tool. A single very long tool call (e.g. a 30-min render)
|
||||||
|
// therefore won't refresh the clock until it returns; a host that runs such
|
||||||
|
// tools should feed interim progress to its Critic (mort's InstallProgressBridge
|
||||||
|
// pattern). A true pre-dispatch refresh needs a majordomo hook (follow-up).
|
||||||
|
func (b *criticBinding) recordToolStart(name, args string) {
|
||||||
|
if b != nil {
|
||||||
|
b.h.RecordToolStart(name, args)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// maxStepsOption returns the agent step-ceiling Option. With no critic it's a
|
||||||
|
// fixed WithMaxSteps(base); with a critic it's a DYNAMIC WithMaxStepsFunc that
|
||||||
|
// polls the handle each step (so the critic can raise a long run's budget),
|
||||||
|
// falling back to base when the handle defers (MaxSteps() <= 0).
|
||||||
|
func (b *criticBinding) maxStepsOption(base int) agent.Option {
|
||||||
|
if b == nil {
|
||||||
|
return agent.WithMaxSteps(base)
|
||||||
|
}
|
||||||
|
return agent.WithMaxStepsFunc(func() int {
|
||||||
|
if n := b.h.MaxSteps(); n > 0 {
|
||||||
|
return n
|
||||||
|
}
|
||||||
|
return base
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
// drainSteer returns the critic's queued steer messages (nil-safe), so the
|
||||||
|
// executor can merge them with the session steer mailbox into one WithSteer.
|
||||||
|
func (b *criticBinding) drainSteer() []llm.Message {
|
||||||
|
if b == nil {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
return b.h.Steer()
|
||||||
|
}
|
||||||
@@ -0,0 +1,128 @@
|
|||||||
|
package run_test
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"sync"
|
||||||
|
"testing"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"gitea.stevedudenhoeffer.com/steve/majordomo/llm"
|
||||||
|
"gitea.stevedudenhoeffer.com/steve/majordomo/provider/fake"
|
||||||
|
|
||||||
|
"gitea.stevedudenhoeffer.com/steve/executus/run"
|
||||||
|
"gitea.stevedudenhoeffer.com/steve/executus/tool"
|
||||||
|
)
|
||||||
|
|
||||||
|
type fakeCritic struct{ h *fakeCriticHandle }
|
||||||
|
|
||||||
|
func (c *fakeCritic) Monitor(_ context.Context, _ run.RunInfo, _ time.Duration) run.CriticHandle {
|
||||||
|
return c.h
|
||||||
|
}
|
||||||
|
|
||||||
|
type fakeCriticHandle struct {
|
||||||
|
mu sync.Mutex
|
||||||
|
steps, tools, stops int
|
||||||
|
steered int
|
||||||
|
maxSteps int // 0 => defer to the run's base MaxIterations
|
||||||
|
killCause error // non-nil simulates a critic kill
|
||||||
|
}
|
||||||
|
|
||||||
|
func (h *fakeCriticHandle) RecordStep(int, *llm.Response) { h.mu.Lock(); h.steps++; h.mu.Unlock() }
|
||||||
|
func (h *fakeCriticHandle) KillCause() error {
|
||||||
|
h.mu.Lock()
|
||||||
|
defer h.mu.Unlock()
|
||||||
|
return h.killCause
|
||||||
|
}
|
||||||
|
func (h *fakeCriticHandle) RecordToolStart(string, string) {
|
||||||
|
h.mu.Lock()
|
||||||
|
h.tools++
|
||||||
|
h.mu.Unlock()
|
||||||
|
}
|
||||||
|
func (h *fakeCriticHandle) Steer() []llm.Message { h.mu.Lock(); h.steered++; h.mu.Unlock(); return nil }
|
||||||
|
func (h *fakeCriticHandle) Deadline() time.Time { return time.Time{} } // no hard deadline
|
||||||
|
func (h *fakeCriticHandle) MaxSteps() int { h.mu.Lock(); defer h.mu.Unlock(); return h.maxSteps }
|
||||||
|
func (h *fakeCriticHandle) Stop() { h.mu.Lock(); h.stops++; h.mu.Unlock() }
|
||||||
|
|
||||||
|
// TestCriticRaisesStepCeiling: a critic returning a higher MaxSteps lets the agent
|
||||||
|
// run PAST its base MaxIterations (the dynamic step ceiling). With base=1 and no
|
||||||
|
// critic the run would hit ErrMaxSteps after the first tool-dispatch step; the
|
||||||
|
// critic raises it to 5 so the run completes.
|
||||||
|
func TestCriticRaisesStepCeiling(t *testing.T) {
|
||||||
|
h := &fakeCriticHandle{maxSteps: 5}
|
||||||
|
fp := fake.New("fake")
|
||||||
|
fp.Enqueue("m",
|
||||||
|
// two tool-call steps (unknown tool → tolerated error results), then answer
|
||||||
|
fake.ReplyWith(llm.Response{ToolCalls: []llm.ToolCall{{ID: "c1", Name: "noop", Arguments: []byte(`{}`)}}}),
|
||||||
|
fake.ReplyWith(llm.Response{ToolCalls: []llm.ToolCall{{ID: "c2", Name: "noop", Arguments: []byte(`{}`)}}}),
|
||||||
|
fake.Reply("done after 2 tool steps"),
|
||||||
|
)
|
||||||
|
m, _ := fp.Model("m")
|
||||||
|
ex := run.New(run.Config{
|
||||||
|
Registry: tool.NewRegistry(),
|
||||||
|
Models: func(ctx context.Context, _ string) (context.Context, llm.Model, error) { return ctx, m, nil },
|
||||||
|
Ports: run.Ports{Critic: &fakeCritic{h: h}},
|
||||||
|
// large soft timeout so the deadline-watch never interferes in the test
|
||||||
|
Defaults: run.Defaults{CriticSoftTimeout: time.Hour},
|
||||||
|
})
|
||||||
|
res := ex.Run(context.Background(),
|
||||||
|
run.RunnableAgent{Name: "x", ModelTier: "m", MaxIterations: 1, Critic: run.CriticConfig{Enabled: true}},
|
||||||
|
tool.Invocation{RunID: "r"}, "go")
|
||||||
|
if res.Err != nil {
|
||||||
|
t.Fatalf("critic raised the ceiling to 5, run should complete past base=1: %v", res.Err)
|
||||||
|
}
|
||||||
|
if res.Output != "done after 2 tool steps" {
|
||||||
|
t.Errorf("output = %q", res.Output)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// TestCriticWired: an agent with Critic.Enabled gets monitored — Monitor returns
|
||||||
|
// a handle the executor feeds (RecordStep), drains (Steer), and stops.
|
||||||
|
func TestCriticWired(t *testing.T) {
|
||||||
|
h := &fakeCriticHandle{}
|
||||||
|
fp := fake.New("fake")
|
||||||
|
fp.Enqueue("m", fake.Reply("done"))
|
||||||
|
m, _ := fp.Model("m")
|
||||||
|
ex := run.New(run.Config{
|
||||||
|
Registry: tool.NewRegistry(),
|
||||||
|
Models: func(ctx context.Context, _ string) (context.Context, llm.Model, error) { return ctx, m, nil },
|
||||||
|
Ports: run.Ports{Critic: &fakeCritic{h: h}},
|
||||||
|
})
|
||||||
|
res := ex.Run(context.Background(),
|
||||||
|
run.RunnableAgent{Name: "watched", ModelTier: "m", Critic: run.CriticConfig{Enabled: true}},
|
||||||
|
tool.Invocation{RunID: "r"}, "go")
|
||||||
|
if res.Err != nil {
|
||||||
|
t.Fatalf("run error: %v", res.Err)
|
||||||
|
}
|
||||||
|
h.mu.Lock()
|
||||||
|
defer h.mu.Unlock()
|
||||||
|
if h.steps < 1 {
|
||||||
|
t.Errorf("critic should have seen >=1 step, got %d", h.steps)
|
||||||
|
}
|
||||||
|
if h.steered < 1 {
|
||||||
|
t.Errorf("critic Steer should be drained at least once, got %d", h.steered)
|
||||||
|
}
|
||||||
|
if h.stops != 1 {
|
||||||
|
t.Errorf("critic Stop should be called exactly once, got %d", h.stops)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// TestCriticDisabledNotMonitored: Critic.Enabled=false → Monitor never called.
|
||||||
|
func TestCriticDisabledNotMonitored(t *testing.T) {
|
||||||
|
h := &fakeCriticHandle{}
|
||||||
|
fp := fake.New("fake")
|
||||||
|
fp.Enqueue("m", fake.Reply("done"))
|
||||||
|
m, _ := fp.Model("m")
|
||||||
|
ex := run.New(run.Config{
|
||||||
|
Registry: tool.NewRegistry(),
|
||||||
|
Models: func(ctx context.Context, _ string) (context.Context, llm.Model, error) { return ctx, m, nil },
|
||||||
|
Ports: run.Ports{Critic: &fakeCritic{h: h}},
|
||||||
|
})
|
||||||
|
ex.Run(context.Background(),
|
||||||
|
run.RunnableAgent{Name: "x", ModelTier: "m"}, // Critic.Enabled=false
|
||||||
|
tool.Invocation{RunID: "r"}, "go")
|
||||||
|
h.mu.Lock()
|
||||||
|
defer h.mu.Unlock()
|
||||||
|
if h.stops != 0 || h.steps != 0 {
|
||||||
|
t.Errorf("disabled critic should not be monitored: steps=%d stops=%d", h.steps, h.stops)
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -0,0 +1,114 @@
|
|||||||
|
package run_test
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"errors"
|
||||||
|
"testing"
|
||||||
|
|
||||||
|
"gitea.stevedudenhoeffer.com/steve/majordomo/llm"
|
||||||
|
"gitea.stevedudenhoeffer.com/steve/majordomo/provider/fake"
|
||||||
|
|
||||||
|
"gitea.stevedudenhoeffer.com/steve/executus/deliver"
|
||||||
|
"gitea.stevedudenhoeffer.com/steve/executus/run"
|
||||||
|
"gitea.stevedudenhoeffer.com/steve/executus/tool"
|
||||||
|
)
|
||||||
|
|
||||||
|
type recordingDelivery struct {
|
||||||
|
target deliver.Target
|
||||||
|
output string
|
||||||
|
errored error
|
||||||
|
delivers int
|
||||||
|
}
|
||||||
|
|
||||||
|
func (d *recordingDelivery) Deliver(_ context.Context, t deliver.Target, output string, _ []deliver.Artifact) (string, error) {
|
||||||
|
d.target, d.output, d.delivers = t, output, d.delivers+1
|
||||||
|
return "msg-1", nil
|
||||||
|
}
|
||||||
|
func (d *recordingDelivery) DeliverError(_ context.Context, t deliver.Target, e error) error {
|
||||||
|
d.target, d.errored = t, e
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestDeliveryWired(t *testing.T) {
|
||||||
|
d := &recordingDelivery{}
|
||||||
|
fp := fake.New("fake")
|
||||||
|
fp.Enqueue("m", fake.Reply("the output"))
|
||||||
|
m, _ := fp.Model("m")
|
||||||
|
ex := run.New(run.Config{
|
||||||
|
Registry: tool.NewRegistry(),
|
||||||
|
Models: func(ctx context.Context, _ string) (context.Context, llm.Model, error) { return ctx, m, nil },
|
||||||
|
Ports: run.Ports{Delivery: d},
|
||||||
|
})
|
||||||
|
// With a delivery target, the executor posts the output.
|
||||||
|
ex.Run(context.Background(),
|
||||||
|
run.RunnableAgent{Name: "x", ModelTier: "m"},
|
||||||
|
tool.Invocation{RunID: "r", DeliveryKind: "channel", DeliveryID: "chan-9"}, "go")
|
||||||
|
if d.delivers != 1 || d.output != "the output" || d.target.ID != "chan-9" || d.target.Kind != "channel" {
|
||||||
|
t.Fatalf("delivery wrong: %+v out=%q", d.target, d.output)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestNoDeliveryWithoutTarget(t *testing.T) {
|
||||||
|
d := &recordingDelivery{}
|
||||||
|
fp := fake.New("fake")
|
||||||
|
fp.Enqueue("m", fake.Reply("x"))
|
||||||
|
m, _ := fp.Model("m")
|
||||||
|
ex := run.New(run.Config{
|
||||||
|
Registry: tool.NewRegistry(),
|
||||||
|
Models: func(ctx context.Context, _ string) (context.Context, llm.Model, error) { return ctx, m, nil },
|
||||||
|
Ports: run.Ports{Delivery: d},
|
||||||
|
})
|
||||||
|
// No DeliveryID → executor delivers nothing (caller reads Result.Output).
|
||||||
|
ex.Run(context.Background(),
|
||||||
|
run.RunnableAgent{Name: "x", ModelTier: "m"},
|
||||||
|
tool.Invocation{RunID: "r"}, "go")
|
||||||
|
if d.delivers != 0 {
|
||||||
|
t.Errorf("no target should mean no delivery, got %d", d.delivers)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// TestNoDeliveryOnEarlyResolveError: an error BEFORE the run starts (model
|
||||||
|
// resolve) returns before delivery is reached — neither Deliver nor DeliverError
|
||||||
|
// fires. (Delivery covers run OUTCOMES, not pre-run setup failures.)
|
||||||
|
func TestNoDeliveryOnEarlyResolveError(t *testing.T) {
|
||||||
|
d := &recordingDelivery{}
|
||||||
|
ex := run.New(run.Config{
|
||||||
|
Registry: tool.NewRegistry(),
|
||||||
|
Models: func(ctx context.Context, _ string) (context.Context, llm.Model, error) {
|
||||||
|
return ctx, nil, errors.New("resolve boom")
|
||||||
|
},
|
||||||
|
Ports: run.Ports{Delivery: d},
|
||||||
|
})
|
||||||
|
ex.Run(context.Background(),
|
||||||
|
run.RunnableAgent{Name: "x", ModelTier: "m"},
|
||||||
|
tool.Invocation{RunID: "r", DeliveryKind: "channel", DeliveryID: "chan-9"}, "go")
|
||||||
|
if d.delivers != 0 || d.errored != nil {
|
||||||
|
t.Errorf("early resolve failure should neither Deliver nor DeliverError: delivers=%d errored=%v", d.delivers, d.errored)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// TestDeliverErrorOnRunFailure: an in-loop run failure (the model errors) routes
|
||||||
|
// through DeliverError with the run error.
|
||||||
|
func TestDeliverErrorOnRunFailure(t *testing.T) {
|
||||||
|
d := &recordingDelivery{}
|
||||||
|
fp := fake.New("fake")
|
||||||
|
fp.Enqueue("m", fake.Step{Err: errors.New("model boom")}) // model errors mid-run
|
||||||
|
m, _ := fp.Model("m")
|
||||||
|
ex := run.New(run.Config{
|
||||||
|
Registry: tool.NewRegistry(),
|
||||||
|
Models: func(ctx context.Context, _ string) (context.Context, llm.Model, error) { return ctx, m, nil },
|
||||||
|
Ports: run.Ports{Delivery: d},
|
||||||
|
})
|
||||||
|
res := ex.Run(context.Background(),
|
||||||
|
run.RunnableAgent{Name: "x", ModelTier: "m"},
|
||||||
|
tool.Invocation{RunID: "r", DeliveryKind: "channel", DeliveryID: "chan-9"}, "go")
|
||||||
|
if res.Err == nil {
|
||||||
|
t.Fatal("expected a run error")
|
||||||
|
}
|
||||||
|
if d.delivers != 0 {
|
||||||
|
t.Errorf("a failed run should not Deliver (success path), got %d", d.delivers)
|
||||||
|
}
|
||||||
|
if d.errored == nil || d.target.ID != "chan-9" {
|
||||||
|
t.Errorf("a failed run with a target should DeliverError to chan-9, got errored=%v target=%+v", d.errored, d.target)
|
||||||
|
}
|
||||||
|
}
|
||||||
+172
-22
@@ -4,12 +4,14 @@ import (
|
|||||||
"context"
|
"context"
|
||||||
"errors"
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
|
"strings"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"gitea.stevedudenhoeffer.com/steve/majordomo/agent"
|
"gitea.stevedudenhoeffer.com/steve/majordomo/agent"
|
||||||
"gitea.stevedudenhoeffer.com/steve/majordomo/llm"
|
"gitea.stevedudenhoeffer.com/steve/majordomo/llm"
|
||||||
|
|
||||||
"gitea.stevedudenhoeffer.com/steve/executus/compact"
|
"gitea.stevedudenhoeffer.com/steve/executus/compact"
|
||||||
|
"gitea.stevedudenhoeffer.com/steve/executus/deliver"
|
||||||
"gitea.stevedudenhoeffer.com/steve/executus/tool"
|
"gitea.stevedudenhoeffer.com/steve/executus/tool"
|
||||||
)
|
)
|
||||||
|
|
||||||
@@ -27,6 +29,7 @@ type Defaults struct {
|
|||||||
MaxConsecutiveToolErrors int // loop guard; default 3
|
MaxConsecutiveToolErrors int // loop guard; default 3
|
||||||
MaxSameToolCallRepeats int // retry-storm guard; default 3
|
MaxSameToolCallRepeats int // retry-storm guard; default 3
|
||||||
CompactionThresholdRatio float64 // fraction of model context to compact at; default 0.7
|
CompactionThresholdRatio float64 // fraction of model context to compact at; default 0.7
|
||||||
|
CriticSoftTimeout time.Duration // idle window before the critic wakes; default 90s
|
||||||
}
|
}
|
||||||
|
|
||||||
func (d Defaults) withFallbacks() Defaults {
|
func (d Defaults) withFallbacks() Defaults {
|
||||||
@@ -48,6 +51,9 @@ func (d Defaults) withFallbacks() Defaults {
|
|||||||
if d.CompactionThresholdRatio <= 0 {
|
if d.CompactionThresholdRatio <= 0 {
|
||||||
d.CompactionThresholdRatio = 0.7
|
d.CompactionThresholdRatio = 0.7
|
||||||
}
|
}
|
||||||
|
if d.CriticSoftTimeout <= 0 {
|
||||||
|
d.CriticSoftTimeout = 90 * time.Second
|
||||||
|
}
|
||||||
return d
|
return d
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -96,13 +102,26 @@ type Result struct {
|
|||||||
Steps []tool.Step
|
Steps []tool.Step
|
||||||
Usage llm.Usage
|
Usage llm.Usage
|
||||||
Err error
|
Err error
|
||||||
|
// PostRunResult carries artifacts produced by a SessionToolFactory's PostRun
|
||||||
|
// hook (rendered images, files). nil when no factory was set or PostRun
|
||||||
|
// returned nil. The host delivers these (e.g. mort's chat API / Discord).
|
||||||
|
PostRunResult *tool.PostRunResult
|
||||||
}
|
}
|
||||||
|
|
||||||
// Run executes ra with the given invocation + input and returns the Result. It
|
// Run executes ra with the given invocation + input and returns the Result. It
|
||||||
// never propagates a panic; failures surface in Result.Err.
|
// never propagates a panic; failures surface in Result.Err (a top-level recover
|
||||||
func (e *Executor) Run(ctx context.Context, ra RunnableAgent, inv tool.Invocation, input string) Result {
|
// converts any panic — including from a host Port — into a run error).
|
||||||
|
func (e *Executor) Run(ctx context.Context, ra RunnableAgent, inv tool.Invocation, input string) (res Result) {
|
||||||
started := time.Now()
|
started := time.Now()
|
||||||
res := Result{RunID: inv.RunID}
|
res = Result{RunID: inv.RunID}
|
||||||
|
// Enforce the no-panic contract: a panic anywhere in the run (incl. a host
|
||||||
|
// Critic/Audit/Palette callback on the main goroutine) becomes Result.Err
|
||||||
|
// rather than unwinding into the caller.
|
||||||
|
defer func() {
|
||||||
|
if r := recover(); r != nil {
|
||||||
|
res.Err = fmt.Errorf("run.Executor: recovered panic: %v", r)
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
|
||||||
tier := ra.ModelTier
|
tier := ra.ModelTier
|
||||||
if tier == "" {
|
if tier == "" {
|
||||||
@@ -141,25 +160,33 @@ func (e *Executor) Run(ctx context.Context, ra RunnableAgent, inv tool.Invocatio
|
|||||||
|
|
||||||
// Audit start (optional). The recorder satisfies RunTally; stamp it on the
|
// Audit start (optional). The recorder satisfies RunTally; stamp it on the
|
||||||
// invocation so a self-status tool can read live progress.
|
// invocation so a self-status tool can read live progress.
|
||||||
|
info := RunInfo{
|
||||||
|
RunID: inv.RunID,
|
||||||
|
SubjectID: ra.ID,
|
||||||
|
Name: ra.Name,
|
||||||
|
CallerID: inv.CallerID,
|
||||||
|
ChannelID: inv.ChannelID,
|
||||||
|
ParentRunID: inv.ParentRunID,
|
||||||
|
Inputs: inv.SkillInputs,
|
||||||
|
StartedAt: started,
|
||||||
|
MaxIterations: maxIter,
|
||||||
|
}
|
||||||
var rec RunRecorder
|
var rec RunRecorder
|
||||||
var stateAcc *RunStateAccessor
|
var stateAcc *RunStateAccessor
|
||||||
if e.cfg.Ports.Audit != nil {
|
if e.cfg.Ports.Audit != nil {
|
||||||
rec = e.cfg.Ports.Audit.StartRun(ctx, RunInfo{
|
rec = e.cfg.Ports.Audit.StartRun(ctx, info)
|
||||||
RunID: inv.RunID,
|
|
||||||
SubjectID: ra.ID,
|
|
||||||
Name: ra.Name,
|
|
||||||
CallerID: inv.CallerID,
|
|
||||||
ChannelID: inv.ChannelID,
|
|
||||||
ParentRunID: inv.ParentRunID,
|
|
||||||
Inputs: inv.SkillInputs,
|
|
||||||
StartedAt: started,
|
|
||||||
})
|
|
||||||
}
|
}
|
||||||
if rec != nil {
|
if rec != nil {
|
||||||
stateAcc = NewRunStateAccessor(rec, maxIter, 0, started)
|
stateAcc = NewRunStateAccessor(rec, maxIter, 0, started)
|
||||||
inv.RunState = stateAcc
|
inv.RunState = stateAcc
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Steer mailbox: lets session tools (via inv.AttachImages) feed multimodal
|
||||||
|
// messages into the running conversation before its next step. Created BEFORE
|
||||||
|
// the toolbox build so any tool's handler captures the live AttachImages seam.
|
||||||
|
mailbox := &steerMailbox{}
|
||||||
|
inv.AttachImages = (&runSession{mailbox: mailbox}).AttachImages
|
||||||
|
|
||||||
// Build the toolbox from the agent's low-level tools.
|
// Build the toolbox from the agent's low-level tools.
|
||||||
toolbox, err := e.cfg.Registry.Build(ra.LowLevelTools, inv, tool.Visibility("private"), nil)
|
toolbox, err := e.cfg.Registry.Build(ra.LowLevelTools, inv, tool.Visibility("private"), nil)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@@ -168,16 +195,66 @@ func (e *Executor) Run(ctx context.Context, ra RunnableAgent, inv tool.Invocatio
|
|||||||
return res
|
return res
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Add skill__/agent__ delegation tools from the agent's palette (nil-safe:
|
||||||
|
// no PaletteSource or empty palette → no delegation tools).
|
||||||
|
if err := addDelegationTools(toolbox, ra, inv, e.cfg.Ports.Palette); err != nil {
|
||||||
|
res.Err = fmt.Errorf("build delegation tools: %w", err)
|
||||||
|
e.finishAudit(ctx, rec, "error", res, started, res.Err)
|
||||||
|
return res
|
||||||
|
}
|
||||||
|
|
||||||
|
// Per-invocation ExtraTools + a SessionToolFactory's per-run tools, added on
|
||||||
|
// top of the agent's palette. The factory closes over the live session (the
|
||||||
|
// AttachImages mailbox); its PostRun hook (held for after the run) produces
|
||||||
|
// artifacts attached to res.PostRunResult, and its Cleanup is deferred. All
|
||||||
|
// nil-safe.
|
||||||
|
for _, t := range inv.ExtraTools {
|
||||||
|
if err := toolbox.Add(t); err != nil {
|
||||||
|
res.Err = fmt.Errorf("add extra tool: %w", err)
|
||||||
|
e.finishAudit(ctx, rec, "error", res, started, res.Err)
|
||||||
|
return res
|
||||||
|
}
|
||||||
|
}
|
||||||
|
var postRun func(ctx context.Context, transcript []llm.Message, output string, runErr error) *tool.PostRunResult
|
||||||
|
if inv.SessionToolFactory != nil {
|
||||||
|
st := inv.SessionToolFactory(&runSession{mailbox: mailbox})
|
||||||
|
if st.Cleanup != nil {
|
||||||
|
defer safeCleanup(st.Cleanup) // panic-isolated, like runPostRun
|
||||||
|
}
|
||||||
|
for _, t := range st.Tools {
|
||||||
|
if err := toolbox.Add(t); err != nil {
|
||||||
|
res.Err = fmt.Errorf("add session tool: %w", err)
|
||||||
|
e.finishAudit(ctx, rec, "error", res, started, res.Err)
|
||||||
|
return res
|
||||||
|
}
|
||||||
|
}
|
||||||
|
postRun = st.PostRun
|
||||||
|
}
|
||||||
|
|
||||||
// Run context: bound by MaxRuntime, detached from the caller's deadline so a
|
// Run context: bound by MaxRuntime, detached from the caller's deadline so a
|
||||||
// lane/queue wait doesn't eat the run budget (mort's V10 lesson). Caller
|
// lane/queue wait doesn't eat the run budget (mort's V10 lesson). Caller
|
||||||
// cancellation still propagates via MergeCancellation. Created BEFORE the
|
// cancellation still propagates via MergeCancellation. Created BEFORE the
|
||||||
// step observer so the observer forwards the merged run context (not a
|
// step observer so the observer forwards the merged run context (not a
|
||||||
// possibly-cancelled caller ctx) to OnStep consumers.
|
// possibly-cancelled caller ctx) to OnStep consumers.
|
||||||
runCtx, cancel := context.WithTimeout(context.WithoutCancel(ctx), maxRuntime)
|
// MaxRuntime stays a WithTimeout so its DeadlineExceeded propagates through the
|
||||||
defer cancel()
|
// child chain (→ "timeout"), preserving the run's-own-timeout vs caller-cancel
|
||||||
|
// distinction. A NESTED cause-carrying layer lets a critic kill surface as a
|
||||||
|
// distinct "killed" without disturbing that: only an ErrCriticKill cause is
|
||||||
|
// consulted in statusFor; a generic run error or a caller cancel is classified
|
||||||
|
// by the run error itself.
|
||||||
|
timeoutCtx, cancelTimeout := context.WithTimeout(context.WithoutCancel(ctx), maxRuntime)
|
||||||
|
defer cancelTimeout()
|
||||||
|
runCtx, cancelCause := context.WithCancelCause(timeoutCtx)
|
||||||
|
defer cancelCause(nil)
|
||||||
runCtx, mergeCancel := MergeCancellation(runCtx, ctx)
|
runCtx, mergeCancel := MergeCancellation(runCtx, ctx)
|
||||||
defer mergeCancel()
|
defer mergeCancel()
|
||||||
|
|
||||||
|
// Critic (optional): monitors the run for a stall, can nudge/extend/kill via
|
||||||
|
// its host Escalator. Its hard deadline is bound to runCtx (cancel on pass).
|
||||||
|
// nil-safe: no-op when no critic is configured or the agent doesn't enable it.
|
||||||
|
critic, stopCritic := e.startCritic(runCtx, cancelCause, ra, info)
|
||||||
|
defer stopCritic()
|
||||||
|
|
||||||
// Step instrumentation: accumulate Result.Steps + fire inv.OnStep, feed the
|
// Step instrumentation: accumulate Result.Steps + fire inv.OnStep, feed the
|
||||||
// audit recorder, and keep the live iteration counter fresh. majordomo's
|
// audit recorder, and keep the live iteration counter fresh. majordomo's
|
||||||
// step observer hands us each completed iteration; we zip the model's tool
|
// step observer hands us each completed iteration; we zip the model's tool
|
||||||
@@ -192,6 +269,7 @@ func (e *Executor) Run(ctx context.Context, ra RunnableAgent, inv tool.Invocatio
|
|||||||
if rec != nil {
|
if rec != nil {
|
||||||
rec.OnStep(s.Index, s.Response)
|
rec.OnStep(s.Index, s.Response)
|
||||||
}
|
}
|
||||||
|
critic.recordStep(s.Index, s.Response) // keep the critic's activity clock fresh + carry the step payload
|
||||||
var calls []llm.ToolCall
|
var calls []llm.ToolCall
|
||||||
if s.Response != nil {
|
if s.Response != nil {
|
||||||
calls = s.Response.ToolCalls
|
calls = s.Response.ToolCalls
|
||||||
@@ -202,6 +280,7 @@ func (e *Executor) Run(ctx context.Context, ra RunnableAgent, inv tool.Invocatio
|
|||||||
}
|
}
|
||||||
for i := 0; i < n; i++ {
|
for i := 0; i < n; i++ {
|
||||||
call, r := calls[i], s.Results[i]
|
call, r := calls[i], s.Results[i]
|
||||||
|
critic.recordToolStart(call.Name, string(call.Arguments))
|
||||||
emitter.toolStart(runCtx, call.Name, call.Arguments)
|
emitter.toolStart(runCtx, call.Name, call.Arguments)
|
||||||
emitter.toolEnd(runCtx, call, r.Content, r.IsError)
|
emitter.toolEnd(runCtx, call, r.Content, r.IsError)
|
||||||
if rec != nil {
|
if rec != nil {
|
||||||
@@ -212,7 +291,10 @@ func (e *Executor) Run(ctx context.Context, ra RunnableAgent, inv tool.Invocatio
|
|||||||
|
|
||||||
opts := []agent.Option{
|
opts := []agent.Option{
|
||||||
agent.WithToolbox(toolbox),
|
agent.WithToolbox(toolbox),
|
||||||
agent.WithMaxSteps(maxIter),
|
// Step ceiling: a fixed WithMaxSteps(maxIter) normally, but when a critic is
|
||||||
|
// active it owns a DYNAMIC ceiling (WithMaxStepsFunc) so it can raise a
|
||||||
|
// healthy-but-long run's budget mid-flight. Falls back to maxIter.
|
||||||
|
critic.maxStepsOption(maxIter),
|
||||||
agent.WithToolErrorLimits(e.cfg.Defaults.MaxConsecutiveToolErrors, e.cfg.Defaults.MaxSameToolCallRepeats),
|
agent.WithToolErrorLimits(e.cfg.Defaults.MaxConsecutiveToolErrors, e.cfg.Defaults.MaxSameToolCallRepeats),
|
||||||
agent.WithStepObserver(stepObserver),
|
agent.WithStepObserver(stepObserver),
|
||||||
}
|
}
|
||||||
@@ -236,9 +318,12 @@ func (e *Executor) Run(ctx context.Context, ra RunnableAgent, inv tool.Invocatio
|
|||||||
}
|
}
|
||||||
|
|
||||||
ag := agent.New(model, e.systemPrompt(ra), opts...)
|
ag := agent.New(model, e.systemPrompt(ra), opts...)
|
||||||
runRes, runErr := ag.Run(runCtx, input)
|
// One WithSteer drains BOTH the session mailbox (a tool's AttachImages) and
|
||||||
|
// the critic's nudges before each step.
|
||||||
|
steer := func() []llm.Message { return append(mailbox.drain(), critic.drainSteer()...) }
|
||||||
|
runRes, runErr := runAgent(runCtx, ag, input, inv.Images, agent.WithSteer(steer))
|
||||||
|
|
||||||
status := statusFor(runErr)
|
status := statusFor(runCtx, runErr)
|
||||||
if runRes != nil {
|
if runRes != nil {
|
||||||
res.Output = runRes.Output
|
res.Output = runRes.Output
|
||||||
res.Usage = runRes.Usage
|
res.Usage = runRes.Usage
|
||||||
@@ -246,20 +331,43 @@ func (e *Executor) Run(ctx context.Context, ra RunnableAgent, inv tool.Invocatio
|
|||||||
res.Steps = emitter.snapshot()
|
res.Steps = emitter.snapshot()
|
||||||
res.Err = runErr
|
res.Err = runErr
|
||||||
|
|
||||||
|
// PostRun: hand the SessionToolFactory's hook the full transcript (populated
|
||||||
|
// even on partial results) so it can produce artifacts. Best-effort +
|
||||||
|
// panic-isolated — a PostRun failure never fails an otherwise-successful run.
|
||||||
|
if postRun != nil {
|
||||||
|
var transcript []llm.Message
|
||||||
|
if runRes != nil {
|
||||||
|
transcript = runRes.Messages
|
||||||
|
}
|
||||||
|
// Detach from the caller's ctx: a finished/cancelled caller must not abort
|
||||||
|
// artifact production (the hook owns its own bounding, per its contract).
|
||||||
|
res.PostRunResult = runPostRun(detach(ctx), postRun, transcript, res.Output, runErr)
|
||||||
|
}
|
||||||
|
|
||||||
e.finishAudit(ctx, rec, status, res, started, runErr)
|
e.finishAudit(ctx, rec, status, res, started, runErr)
|
||||||
if e.cfg.Ports.Budget != nil {
|
if e.cfg.Ports.Budget != nil {
|
||||||
e.cfg.Ports.Budget.Commit(detach(ctx), inv.CallerID, time.Since(started).Seconds())
|
e.cfg.Ports.Budget.Commit(detach(ctx), inv.CallerID, time.Since(started).Seconds())
|
||||||
}
|
}
|
||||||
|
e.deliver(ctx, inv, res, runErr)
|
||||||
return res
|
return res
|
||||||
}
|
}
|
||||||
|
|
||||||
// statusFor maps a run error to a RunStats.Status, distinguishing a deadline
|
// statusFor maps a run error to a RunStats.Status, distinguishing a critic kill
|
||||||
// (timeout) and a cancellation (cancelled — caller cancel or shutdown) from a
|
// (killed), a deadline (timeout), and a cancellation (cancelled — caller cancel
|
||||||
// generic error so audit consumers can tell them apart.
|
// or shutdown) from a generic error so audit consumers can tell them apart. The
|
||||||
func statusFor(runErr error) string {
|
// run context's cancellation cause carries the distinction (ErrCriticKill /
|
||||||
|
// DeadlineExceeded), since ctx.Err() alone only reports Canceled.
|
||||||
|
func statusFor(runCtx context.Context, runErr error) string {
|
||||||
switch {
|
switch {
|
||||||
case runErr == nil:
|
case runErr == nil:
|
||||||
return "ok"
|
return "ok"
|
||||||
|
// Only the kill is recovered from the cancellation cause — a critic kill
|
||||||
|
// surfaces as a plain Canceled run error, so without this it'd read as
|
||||||
|
// "cancelled". Everything else is classified by the run error itself, so a
|
||||||
|
// genuine run error is never relabeled just because the context was later
|
||||||
|
// cancelled, and a caller cancel/deadline stays "cancelled" (not "timeout").
|
||||||
|
case errors.Is(context.Cause(runCtx), ErrCriticKill):
|
||||||
|
return "killed"
|
||||||
case errors.Is(runErr, context.DeadlineExceeded):
|
case errors.Is(runErr, context.DeadlineExceeded):
|
||||||
return "timeout"
|
return "timeout"
|
||||||
case errors.Is(runErr, context.Canceled):
|
case errors.Is(runErr, context.Canceled):
|
||||||
@@ -308,6 +416,23 @@ func (e *Executor) compactionThreshold(tier string) int {
|
|||||||
return int(float64(max) * e.cfg.Defaults.CompactionThresholdRatio)
|
return int(float64(max) * e.cfg.Defaults.CompactionThresholdRatio)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// deliver posts the run's output (or error) via run.Ports.Delivery when both a
|
||||||
|
// Delivery and a target (inv.DeliveryID) are set. No target = the caller reads
|
||||||
|
// Result.Output itself (the synchronous default). Best-effort + detached: a
|
||||||
|
// delivery failure must not change the run's outcome.
|
||||||
|
func (e *Executor) deliver(ctx context.Context, inv tool.Invocation, res Result, runErr error) {
|
||||||
|
if e.cfg.Ports.Delivery == nil || inv.DeliveryID == "" {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
target := deliver.Target{Kind: inv.DeliveryKind, ID: inv.DeliveryID}
|
||||||
|
dctx := detach(ctx)
|
||||||
|
if runErr != nil {
|
||||||
|
_ = e.cfg.Ports.Delivery.DeliverError(dctx, target, runErr)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
_, _ = e.cfg.Ports.Delivery.Deliver(dctx, target, res.Output, nil)
|
||||||
|
}
|
||||||
|
|
||||||
// detach derives a bounded cleanup context off ctx, detached from its
|
// detach derives a bounded cleanup context off ctx, detached from its
|
||||||
// cancellation, for post-run writes. The cancel is intentionally not returned;
|
// cancellation, for post-run writes. The cancel is intentionally not returned;
|
||||||
// CleanupContextTimeout bounds the lifetime.
|
// CleanupContextTimeout bounds the lifetime.
|
||||||
@@ -316,3 +441,28 @@ func detach(ctx context.Context) context.Context {
|
|||||||
_ = cancel // bounded by the timeout; nothing to cancel early
|
_ = cancel // bounded by the timeout; nothing to cancel early
|
||||||
return c
|
return c
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// runAgent dispatches the majordomo agent loop. majordomo's Run takes a text-only
|
||||||
|
// input arg, so when the invocation carries images they're folded into the first
|
||||||
|
// user message (text + image parts) via WithHistory and Run is called with an
|
||||||
|
// empty input — the model then sees a multimodal opening turn. The image-less path
|
||||||
|
// passes the prompt straight through.
|
||||||
|
//
|
||||||
|
// The text part is omitted when input is blank (image-only run), matching
|
||||||
|
// runSession.AttachImages so no empty TextPart is sent.
|
||||||
|
func runAgent(ctx context.Context, ag *agent.Agent, input string, images []llm.ImagePart, opts ...agent.RunOption) (*agent.Result, error) {
|
||||||
|
if len(images) == 0 {
|
||||||
|
return ag.Run(ctx, input, opts...)
|
||||||
|
}
|
||||||
|
parts := make([]llm.Part, 0, len(images)+1)
|
||||||
|
if strings.TrimSpace(input) != "" {
|
||||||
|
parts = append(parts, llm.Text(input))
|
||||||
|
}
|
||||||
|
for _, img := range images {
|
||||||
|
parts = append(parts, img)
|
||||||
|
}
|
||||||
|
// Copy opts before appending so a caller-supplied backing array is never
|
||||||
|
// mutated/aliased (the variadic slice can have spare capacity).
|
||||||
|
opts = append(opts[:len(opts):len(opts)], agent.WithHistory([]llm.Message{llm.UserParts(parts...)}))
|
||||||
|
return ag.Run(ctx, "", opts...)
|
||||||
|
}
|
||||||
|
|||||||
+20
-7
@@ -148,20 +148,33 @@ func TestExecutorNilModelNoPanic(t *testing.T) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// TestStatusFor maps run errors to RunStats.Status (gadfly F3).
|
// TestStatusFor maps run errors + cancellation cause to RunStats.Status (gadfly F3).
|
||||||
func TestStatusFor(t *testing.T) {
|
func TestStatusFor(t *testing.T) {
|
||||||
|
bg := context.Background()
|
||||||
|
// A context cancelled with the critic-kill cause: ctx.Err() is Canceled, but
|
||||||
|
// context.Cause carries ErrCriticKill → "killed".
|
||||||
|
killCtx, killCancel := context.WithCancelCause(context.Background())
|
||||||
|
killCancel(fmt.Errorf("%w: hung", ErrCriticKill))
|
||||||
|
// A context cancelled with a non-kill cause must NOT relabel a genuine run
|
||||||
|
// error: a real error stays "error" even though the ctx was later cancelled.
|
||||||
|
cancelledCtx, cc := context.WithCancelCause(context.Background())
|
||||||
|
cc(context.DeadlineExceeded)
|
||||||
cases := []struct {
|
cases := []struct {
|
||||||
|
ctx context.Context
|
||||||
err error
|
err error
|
||||||
want string
|
want string
|
||||||
}{
|
}{
|
||||||
{nil, "ok"},
|
{bg, nil, "ok"},
|
||||||
{context.DeadlineExceeded, "timeout"},
|
{bg, context.DeadlineExceeded, "timeout"},
|
||||||
{context.Canceled, "cancelled"},
|
{bg, context.Canceled, "cancelled"},
|
||||||
{fmt.Errorf("wrapped: %w", context.DeadlineExceeded), "timeout"},
|
{bg, fmt.Errorf("wrapped: %w", context.DeadlineExceeded), "timeout"},
|
||||||
{errors.New("boom"), "error"},
|
{bg, errors.New("boom"), "error"},
|
||||||
|
{killCtx, context.Canceled, "killed"},
|
||||||
|
{cancelledCtx, errors.New("boom"), "error"}, // generic error not relabeled by cause
|
||||||
|
{cancelledCtx, context.Canceled, "cancelled"}, // caller cancel stays cancelled, not timeout
|
||||||
}
|
}
|
||||||
for _, c := range cases {
|
for _, c := range cases {
|
||||||
if got := statusFor(c.err); got != c.want {
|
if got := statusFor(c.ctx, c.err); got != c.want {
|
||||||
t.Errorf("statusFor(%v) = %q, want %q", c.err, got, c.want)
|
t.Errorf("statusFor(%v) = %q, want %q", c.err, got, c.want)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -0,0 +1,121 @@
|
|||||||
|
package run_test
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"strings"
|
||||||
|
"testing"
|
||||||
|
|
||||||
|
"gitea.stevedudenhoeffer.com/steve/majordomo/llm"
|
||||||
|
"gitea.stevedudenhoeffer.com/steve/majordomo/provider/fake"
|
||||||
|
|
||||||
|
"gitea.stevedudenhoeffer.com/steve/executus/run"
|
||||||
|
"gitea.stevedudenhoeffer.com/steve/executus/tool"
|
||||||
|
)
|
||||||
|
|
||||||
|
// TestExecutorFoldsInitialImages: when the invocation carries Images, they're
|
||||||
|
// folded into the first user message (alongside the prompt text) instead of being
|
||||||
|
// dropped — majordomo's Run input arg is text-only, so the executor seeds the
|
||||||
|
// multimodal opening turn via history.
|
||||||
|
func TestExecutorFoldsInitialImages(t *testing.T) {
|
||||||
|
fp := fake.New("fake")
|
||||||
|
fp.Enqueue("m", fake.Reply("saw the image"))
|
||||||
|
m, _ := fp.Model("m")
|
||||||
|
|
||||||
|
img := llm.ImagePart{MIME: "image/png", Data: []byte("PNGDATA")}
|
||||||
|
inv := tool.Invocation{RunID: "r1", Images: []llm.ImagePart{img}}
|
||||||
|
ex := run.New(run.Config{
|
||||||
|
Registry: tool.NewRegistry(),
|
||||||
|
Models: func(ctx context.Context, _ string) (context.Context, llm.Model, error) { return ctx, m, nil },
|
||||||
|
})
|
||||||
|
res := ex.Run(context.Background(), run.RunnableAgent{ModelTier: "m"}, inv, "describe this")
|
||||||
|
if res.Err != nil {
|
||||||
|
t.Fatalf("run error: %v", res.Err)
|
||||||
|
}
|
||||||
|
|
||||||
|
calls := fp.Calls()
|
||||||
|
if len(calls) == 0 {
|
||||||
|
t.Fatal("no model calls recorded")
|
||||||
|
}
|
||||||
|
// The text + image must be CO-LOCATED in a single user message (not split
|
||||||
|
// across two), so the model reads them as one multimodal turn.
|
||||||
|
coLocated := false
|
||||||
|
for _, msg := range calls[0].Request.Messages {
|
||||||
|
sawImage, sawText := false, false
|
||||||
|
for _, p := range msg.Parts {
|
||||||
|
switch pp := p.(type) {
|
||||||
|
case llm.ImagePart:
|
||||||
|
if string(pp.Data) == "PNGDATA" {
|
||||||
|
sawImage = true
|
||||||
|
}
|
||||||
|
case llm.TextPart:
|
||||||
|
if strings.Contains(pp.Text, "describe this") {
|
||||||
|
sawText = true
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if sawImage && sawText {
|
||||||
|
coLocated = true
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if !coLocated {
|
||||||
|
t.Error("image + prompt text were not folded into the SAME user message")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// TestExecutorImageOnlyNoBlankText: an image-only run (blank prompt) must NOT emit
|
||||||
|
// an empty TextPart — the message carries just the image, matching
|
||||||
|
// runSession.AttachImages's guard.
|
||||||
|
func TestExecutorImageOnlyNoBlankText(t *testing.T) {
|
||||||
|
fp := fake.New("fake")
|
||||||
|
fp.Enqueue("m", fake.Reply("saw it"))
|
||||||
|
m, _ := fp.Model("m")
|
||||||
|
|
||||||
|
inv := tool.Invocation{RunID: "r3", Images: []llm.ImagePart{{MIME: "image/png", Data: []byte("IMG")}}}
|
||||||
|
ex := run.New(run.Config{
|
||||||
|
Registry: tool.NewRegistry(),
|
||||||
|
Models: func(ctx context.Context, _ string) (context.Context, llm.Model, error) { return ctx, m, nil },
|
||||||
|
})
|
||||||
|
res := ex.Run(context.Background(), run.RunnableAgent{ModelTier: "m"}, inv, " ")
|
||||||
|
if res.Err != nil {
|
||||||
|
t.Fatalf("run error: %v", res.Err)
|
||||||
|
}
|
||||||
|
for _, msg := range fp.Calls()[0].Request.Messages {
|
||||||
|
for _, p := range msg.Parts {
|
||||||
|
if tp, ok := p.(llm.TextPart); ok && strings.TrimSpace(tp.Text) == "" {
|
||||||
|
t.Error("image-only run emitted a blank TextPart")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// TestExecutorTextOnlyUnchanged: with no Images, the prompt flows through as the
|
||||||
|
// text input (regression guard that the fold path didn't break the common case).
|
||||||
|
func TestExecutorTextOnlyUnchanged(t *testing.T) {
|
||||||
|
fp := fake.New("fake")
|
||||||
|
fp.Enqueue("m", fake.Reply("ok"))
|
||||||
|
m, _ := fp.Model("m")
|
||||||
|
|
||||||
|
ex := run.New(run.Config{
|
||||||
|
Registry: tool.NewRegistry(),
|
||||||
|
Models: func(ctx context.Context, _ string) (context.Context, llm.Model, error) { return ctx, m, nil },
|
||||||
|
})
|
||||||
|
res := ex.Run(context.Background(), run.RunnableAgent{ModelTier: "m"}, tool.Invocation{RunID: "r2"}, "plain prompt")
|
||||||
|
if res.Err != nil {
|
||||||
|
t.Fatalf("run error: %v", res.Err)
|
||||||
|
}
|
||||||
|
calls := fp.Calls()
|
||||||
|
if len(calls) == 0 {
|
||||||
|
t.Fatal("no model calls recorded")
|
||||||
|
}
|
||||||
|
sawText := false
|
||||||
|
for _, msg := range calls[0].Request.Messages {
|
||||||
|
for _, p := range msg.Parts {
|
||||||
|
if tp, ok := p.(llm.TextPart); ok && strings.Contains(tp.Text, "plain prompt") {
|
||||||
|
sawText = true
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if !sawText {
|
||||||
|
t.Error("text-only prompt did not reach the model")
|
||||||
|
}
|
||||||
|
}
|
||||||
+102
@@ -0,0 +1,102 @@
|
|||||||
|
package run
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"fmt"
|
||||||
|
|
||||||
|
"gitea.stevedudenhoeffer.com/steve/majordomo/llm"
|
||||||
|
|
||||||
|
"gitea.stevedudenhoeffer.com/steve/executus/tool"
|
||||||
|
)
|
||||||
|
|
||||||
|
// addDelegationTools adds a delegation tool to the toolbox for each
|
||||||
|
// SkillPalette / SubAgentPalette entry, backed by the PaletteSource:
|
||||||
|
//
|
||||||
|
// - skill__<name> invokes the named saved skill with structured inputs.
|
||||||
|
// - agent__<name> invokes the named sub-agent with a prompt.
|
||||||
|
//
|
||||||
|
// Each delegated call runs as a CHILD of the current run (parentRunID =
|
||||||
|
// inv.RunID), inheriting the caller + channel. No-op when palette is nil or both
|
||||||
|
// palettes are empty — so an agent with no palette (or a host with no
|
||||||
|
// PaletteSource) simply has no delegation tools, exactly as before.
|
||||||
|
func addDelegationTools(box *llm.Toolbox, ra RunnableAgent, inv tool.Invocation, palette PaletteSource) error {
|
||||||
|
if palette == nil {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
seen := map[string]bool{} // dedupe across both palettes by final tool name
|
||||||
|
for _, name := range ra.SkillPalette {
|
||||||
|
name := name // capture
|
||||||
|
toolName := "skill__" + name
|
||||||
|
if name == "" || seen[toolName] { // skip empty / duplicate palette entries
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
seen[toolName] = true
|
||||||
|
t := llm.DefineTool(
|
||||||
|
toolName,
|
||||||
|
fmt.Sprintf("Delegate the task to the %q skill. Provide its declared inputs.", name),
|
||||||
|
func(ctx context.Context, args skillDelegateArgs) (any, error) {
|
||||||
|
out, _, status, err := palette.InvokeSkill(ctx, inv.CallerID, inv.ChannelID, name, args.Inputs, inv.RunID)
|
||||||
|
if err != nil {
|
||||||
|
return nil, delegationErr("skill", name, out, err)
|
||||||
|
}
|
||||||
|
return delegationResult(name, "skill", out, status), nil
|
||||||
|
},
|
||||||
|
)
|
||||||
|
if err := box.Add(t); err != nil {
|
||||||
|
return fmt.Errorf("add %s: %w", toolName, err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
for _, name := range ra.SubAgentPalette {
|
||||||
|
name := name // capture
|
||||||
|
toolName := "agent__" + name
|
||||||
|
if name == "" || seen[toolName] {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
seen[toolName] = true
|
||||||
|
t := llm.DefineTool(
|
||||||
|
toolName,
|
||||||
|
fmt.Sprintf("Delegate the task to the %q sub-agent with a natural-language prompt.", name),
|
||||||
|
func(ctx context.Context, args agentDelegateArgs) (any, error) {
|
||||||
|
out, _, status, err := palette.InvokeAgent(ctx, inv.CallerID, inv.ChannelID, name, args.Prompt, inv.RunID, "", "", nil, nil)
|
||||||
|
if err != nil {
|
||||||
|
return nil, delegationErr("agent", name, out, err)
|
||||||
|
}
|
||||||
|
return delegationResult(name, "agent", out, status), nil
|
||||||
|
},
|
||||||
|
)
|
||||||
|
if err := box.Add(t); err != nil {
|
||||||
|
return fmt.Errorf("add %s: %w", toolName, err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// delegationResult surfaces a non-ok child status to the parent agent (so it can
|
||||||
|
// react to a timeout/cancel/budget stop) while still passing the partial output.
|
||||||
|
func delegationResult(name, kind, out, status string) string {
|
||||||
|
if status != "" && status != "ok" {
|
||||||
|
header := fmt.Sprintf("[%s %q ended with status %q]", kind, name, status)
|
||||||
|
if out == "" { // no trailing blank line when there's no body
|
||||||
|
return header
|
||||||
|
}
|
||||||
|
return header + "\n" + out
|
||||||
|
}
|
||||||
|
return out
|
||||||
|
}
|
||||||
|
|
||||||
|
// delegationErr wraps a hard delegation failure, folding in any partial output
|
||||||
|
// the child produced before failing (so it isn't silently lost).
|
||||||
|
func delegationErr(kind, name, partial string, err error) error {
|
||||||
|
if partial != "" {
|
||||||
|
return fmt.Errorf("%s %q failed (partial output: %q): %w", kind, name, partial, err)
|
||||||
|
}
|
||||||
|
return fmt.Errorf("%s %q failed: %w", kind, name, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
type skillDelegateArgs struct {
|
||||||
|
Inputs map[string]any `json:"inputs" description:"Inputs for the skill, matching its declared input schema."`
|
||||||
|
}
|
||||||
|
|
||||||
|
type agentDelegateArgs struct {
|
||||||
|
Prompt string `json:"prompt" description:"The task/prompt to hand the sub-agent."`
|
||||||
|
}
|
||||||
@@ -0,0 +1,125 @@
|
|||||||
|
package run_test
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"encoding/json"
|
||||||
|
"testing"
|
||||||
|
|
||||||
|
"gitea.stevedudenhoeffer.com/steve/majordomo/llm"
|
||||||
|
"gitea.stevedudenhoeffer.com/steve/majordomo/provider/fake"
|
||||||
|
|
||||||
|
"gitea.stevedudenhoeffer.com/steve/executus/run"
|
||||||
|
"gitea.stevedudenhoeffer.com/steve/executus/tool"
|
||||||
|
)
|
||||||
|
|
||||||
|
// recordingPalette captures the delegation call it received.
|
||||||
|
type recordingPalette struct {
|
||||||
|
gotName, gotCaller, gotParent string
|
||||||
|
gotInputs map[string]any
|
||||||
|
}
|
||||||
|
|
||||||
|
func (p *recordingPalette) ResolveSkill(context.Context, string, string) (string, error) {
|
||||||
|
return "", nil
|
||||||
|
}
|
||||||
|
func (p *recordingPalette) InvokeSkill(_ context.Context, callerID, _, name string, inputs map[string]any, parentRunID string) (string, string, string, error) {
|
||||||
|
p.gotName, p.gotCaller, p.gotParent, p.gotInputs = name, callerID, parentRunID, inputs
|
||||||
|
return "the skill output", "child-run-1", "ok", nil
|
||||||
|
}
|
||||||
|
func (p *recordingPalette) ResolveAgent(context.Context, string, string) (string, error) {
|
||||||
|
return "", nil
|
||||||
|
}
|
||||||
|
func (p *recordingPalette) InvokeAgent(context.Context, string, string, string, string, string, string, string, []string, func(context.Context, string, string)) (string, string, string, error) {
|
||||||
|
return "", "", "ok", nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// TestPaletteDelegation: an agent with a SkillPalette gets a skill__<name> tool;
|
||||||
|
// the model calls it, the executor routes it through run.Ports.Palette as a
|
||||||
|
// child of the current run, and the result flows back into the loop.
|
||||||
|
func TestPaletteDelegation(t *testing.T) {
|
||||||
|
pal := &recordingPalette{}
|
||||||
|
|
||||||
|
fp := fake.New("fake")
|
||||||
|
fp.Enqueue("m",
|
||||||
|
fake.ReplyWith(llm.Response{ToolCalls: []llm.ToolCall{{
|
||||||
|
ID: "c1",
|
||||||
|
Name: "skill__helper",
|
||||||
|
Arguments: json.RawMessage(`{"inputs":{"q":"hi"}}`),
|
||||||
|
}}}),
|
||||||
|
fake.Reply("delegated and done"),
|
||||||
|
)
|
||||||
|
m, err := fp.Model("m")
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
ex := run.New(run.Config{
|
||||||
|
Registry: tool.NewRegistry(),
|
||||||
|
Models: func(ctx context.Context, _ string) (context.Context, llm.Model, error) { return ctx, m, nil },
|
||||||
|
Ports: run.Ports{Palette: pal},
|
||||||
|
})
|
||||||
|
|
||||||
|
res := ex.Run(context.Background(),
|
||||||
|
run.RunnableAgent{ID: "a1", Name: "boss", ModelTier: "m", SkillPalette: []string{"helper"}},
|
||||||
|
tool.Invocation{RunID: "parent-run", CallerID: "caller-7", ChannelID: "chan"},
|
||||||
|
"delegate please")
|
||||||
|
if res.Err != nil {
|
||||||
|
t.Fatalf("run error: %v", res.Err)
|
||||||
|
}
|
||||||
|
if res.Output != "delegated and done" {
|
||||||
|
t.Errorf("output = %q", res.Output)
|
||||||
|
}
|
||||||
|
if pal.gotName != "helper" {
|
||||||
|
t.Errorf("InvokeSkill name = %q, want helper", pal.gotName)
|
||||||
|
}
|
||||||
|
if pal.gotCaller != "caller-7" {
|
||||||
|
t.Errorf("InvokeSkill caller = %q, want caller-7", pal.gotCaller)
|
||||||
|
}
|
||||||
|
if pal.gotParent != "parent-run" {
|
||||||
|
t.Errorf("InvokeSkill parentRunID = %q, want parent-run (child of the current run)", pal.gotParent)
|
||||||
|
}
|
||||||
|
if pal.gotInputs["q"] != "hi" {
|
||||||
|
t.Errorf("InvokeSkill inputs = %+v, want q=hi", pal.gotInputs)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// TestNoPaletteNoDelegationTools: nil PaletteSource → no delegation tools, run
|
||||||
|
// still works (the agent just has no skill__/agent__ tools).
|
||||||
|
func TestNoPaletteNoDelegationTools(t *testing.T) {
|
||||||
|
fp := fake.New("fake")
|
||||||
|
fp.Enqueue("m", fake.Reply("ok"))
|
||||||
|
m, _ := fp.Model("m")
|
||||||
|
ex := run.New(run.Config{
|
||||||
|
Registry: tool.NewRegistry(),
|
||||||
|
Models: func(ctx context.Context, _ string) (context.Context, llm.Model, error) { return ctx, m, nil },
|
||||||
|
})
|
||||||
|
res := ex.Run(context.Background(),
|
||||||
|
run.RunnableAgent{Name: "x", ModelTier: "m", SkillPalette: []string{"helper"}},
|
||||||
|
tool.Invocation{RunID: "r"}, "hi")
|
||||||
|
if res.Err != nil || res.Output != "ok" {
|
||||||
|
t.Fatalf("nil-palette run failed: %v / %q", res.Err, res.Output)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// TestDelegationDedupeAndEmptySkip: empty + duplicate palette names are skipped,
|
||||||
|
// not turned into "skill__"/duplicate tools that error at box.Add (gadfly C0).
|
||||||
|
func TestDelegationDedupeAndEmptySkip(t *testing.T) {
|
||||||
|
pal := &recordingPalette{}
|
||||||
|
fp := fake.New("fake")
|
||||||
|
fp.Enqueue("m", fake.Reply("ok"))
|
||||||
|
m, _ := fp.Model("m")
|
||||||
|
ex := run.New(run.Config{
|
||||||
|
Registry: tool.NewRegistry(),
|
||||||
|
Models: func(ctx context.Context, _ string) (context.Context, llm.Model, error) { return ctx, m, nil },
|
||||||
|
Ports: run.Ports{Palette: pal},
|
||||||
|
})
|
||||||
|
// "" (empty) and a duplicate "helper" must not break the build.
|
||||||
|
res := ex.Run(context.Background(),
|
||||||
|
run.RunnableAgent{Name: "x", ModelTier: "m", SkillPalette: []string{"helper", "", "helper"}},
|
||||||
|
tool.Invocation{RunID: "r"}, "hi")
|
||||||
|
if res.Err != nil {
|
||||||
|
t.Fatalf("empty/duplicate palette names should be skipped, not error: %v", res.Err)
|
||||||
|
}
|
||||||
|
if res.Output != "ok" {
|
||||||
|
t.Fatalf("output = %q", res.Output)
|
||||||
|
}
|
||||||
|
}
|
||||||
+30
-2
@@ -2,6 +2,7 @@ package run
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
|
"errors"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"gitea.stevedudenhoeffer.com/steve/majordomo/llm"
|
"gitea.stevedudenhoeffer.com/steve/majordomo/llm"
|
||||||
@@ -9,6 +10,12 @@ import (
|
|||||||
"gitea.stevedudenhoeffer.com/steve/executus/deliver"
|
"gitea.stevedudenhoeffer.com/steve/executus/deliver"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
// ErrCriticKill is the cancellation cause the executor stamps on a run the
|
||||||
|
// critic kills, so a critic kill surfaces as a distinct "killed" status (vs a
|
||||||
|
// backstop "timeout" or a caller "cancelled"). A host CriticHandle signals a
|
||||||
|
// kill via KillCause(); the executor wraps that reason with this sentinel.
|
||||||
|
var ErrCriticKill = errors.New("run: critic killed the run")
|
||||||
|
|
||||||
// Ports are the host seams the run executor consumes. Every field is nil-safe:
|
// Ports are the host seams the run executor consumes. Every field is nil-safe:
|
||||||
// a light host passes the zero Ports and gets a bounded, in-memory run with no
|
// a light host passes the zero Ports and gets a bounded, in-memory run with no
|
||||||
// persistence, audit, budget, critic, delegation, or delivery — which is
|
// persistence, audit, budget, critic, delegation, or delivery — which is
|
||||||
@@ -48,6 +55,9 @@ type RunInfo struct {
|
|||||||
ParentRunID string
|
ParentRunID string
|
||||||
Inputs map[string]any
|
Inputs map[string]any
|
||||||
StartedAt time.Time
|
StartedAt time.Time
|
||||||
|
// MaxIterations is the run's base tool-dispatch step ceiling, so a critic can
|
||||||
|
// raise it relative to the baseline (see CriticHandle.MaxSteps).
|
||||||
|
MaxIterations int
|
||||||
}
|
}
|
||||||
|
|
||||||
// RunStats is the terminal roll-up a recorder's Close writes. Mirrors mort's
|
// RunStats is the terminal roll-up a recorder's Close writes. Mirrors mort's
|
||||||
@@ -113,10 +123,17 @@ type Critic interface {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// CriticHandle is the executor's live link to a run's critic.
|
// CriticHandle is the executor's live link to a run's critic.
|
||||||
|
//
|
||||||
|
// Concurrency: the executor calls RecordStep/RecordToolStart/Steer from the run
|
||||||
|
// goroutine while a separate watch goroutine polls Deadline() and the run's end
|
||||||
|
// calls Stop() — so implementations MUST be safe for concurrent use across these
|
||||||
|
// methods (the critic battery's handle guards its state with a mutex).
|
||||||
type CriticHandle interface {
|
type CriticHandle interface {
|
||||||
// RecordStep / RecordToolStart keep the critic's activity clock fresh so a
|
// RecordStep / RecordToolStart keep the critic's activity clock fresh so a
|
||||||
// healthy-but-slow run is not mistaken for a hang.
|
// healthy-but-slow run is not mistaken for a hang. RecordStep also carries the
|
||||||
RecordStep(iter int)
|
// completed step's model response (nil-safe) so the critic's Trace can show
|
||||||
|
// what the agent actually produced, not just an iteration count.
|
||||||
|
RecordStep(iter int, resp *llm.Response)
|
||||||
RecordToolStart(name, args string)
|
RecordToolStart(name, args string)
|
||||||
// Steer returns any messages the critic wants injected into the loop (a
|
// Steer returns any messages the critic wants injected into the loop (a
|
||||||
// nudge), drained before each step — matches majordomo agent.WithSteer.
|
// nudge), drained before each step — matches majordomo agent.WithSteer.
|
||||||
@@ -124,6 +141,17 @@ type CriticHandle interface {
|
|||||||
// Deadline returns the current hard-kill deadline (the critic may extend
|
// Deadline returns the current hard-kill deadline (the critic may extend
|
||||||
// it); the executor binds the run context to it. Zero = no hard deadline.
|
// it); the executor binds the run context to it. Zero = no hard deadline.
|
||||||
Deadline() time.Time
|
Deadline() time.Time
|
||||||
|
// MaxSteps returns the current tool-dispatch step ceiling, polled by the
|
||||||
|
// executor each step (via majordomo WithMaxStepsFunc) so a critic can raise a
|
||||||
|
// healthy-but-long run's iteration budget mid-flight. Return <= 0 to defer to
|
||||||
|
// the run's base MaxIterations.
|
||||||
|
MaxSteps() int
|
||||||
|
// KillCause returns a non-nil reason iff the critic has decided to KILL this
|
||||||
|
// run (as opposed to letting the hard-deadline backstop expire). The executor
|
||||||
|
// reads it when the deadline passes: non-nil → cancel the run with
|
||||||
|
// ErrCriticKill (status "killed"); nil → the backstop expired naturally
|
||||||
|
// (status "timeout"). Hosts that never distinguish the two may return nil.
|
||||||
|
KillCause() error
|
||||||
// Stop ends monitoring when the run finishes.
|
// Stop ends monitoring when the run finishes.
|
||||||
Stop()
|
Stop()
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -0,0 +1,88 @@
|
|||||||
|
package run
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"log/slog"
|
||||||
|
"strings"
|
||||||
|
"sync"
|
||||||
|
|
||||||
|
"gitea.stevedudenhoeffer.com/steve/majordomo/llm"
|
||||||
|
|
||||||
|
"gitea.stevedudenhoeffer.com/steve/executus/tool"
|
||||||
|
)
|
||||||
|
|
||||||
|
// runPostRun invokes a SessionToolFactory's PostRun hook with panic isolation:
|
||||||
|
// a PostRun panic (or a slow artifact build that the hook mishandles) must not
|
||||||
|
// fail an otherwise-successful run — artifacts are best-effort, the agent's text
|
||||||
|
// output is the source of truth.
|
||||||
|
func runPostRun(ctx context.Context,
|
||||||
|
hook func(context.Context, []llm.Message, string, error) *tool.PostRunResult,
|
||||||
|
transcript []llm.Message, output string, runErr error) (prr *tool.PostRunResult) {
|
||||||
|
defer func() {
|
||||||
|
if r := recover(); r != nil {
|
||||||
|
slog.Error("run: PostRun hook panicked; no artifacts produced", "panic", r)
|
||||||
|
prr = nil
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
return hook(ctx, transcript, output, runErr)
|
||||||
|
}
|
||||||
|
|
||||||
|
// steerMailbox is a thread-safe queue of messages a session tool (via
|
||||||
|
// tool.Invocation.AttachImages) wants injected into the agent loop before its
|
||||||
|
// next step — the same WithSteer mechanism the critic uses for nudges, exposed
|
||||||
|
// to ordinary tools so they can show the model content (e.g. a rendered
|
||||||
|
// preview) it must SEE, not just be told about.
|
||||||
|
type steerMailbox struct {
|
||||||
|
mu sync.Mutex
|
||||||
|
msgs []llm.Message
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *steerMailbox) push(msg llm.Message) {
|
||||||
|
m.mu.Lock()
|
||||||
|
m.msgs = append(m.msgs, msg)
|
||||||
|
m.mu.Unlock()
|
||||||
|
}
|
||||||
|
|
||||||
|
// drain returns and clears the queued messages (nil when empty).
|
||||||
|
func (m *steerMailbox) drain() []llm.Message {
|
||||||
|
m.mu.Lock()
|
||||||
|
defer m.mu.Unlock()
|
||||||
|
if len(m.msgs) == 0 {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
out := m.msgs
|
||||||
|
m.msgs = nil
|
||||||
|
return out
|
||||||
|
}
|
||||||
|
|
||||||
|
// runSession implements tool.AgentSession over a steer mailbox: AttachImages
|
||||||
|
// queues a user-role multimodal message the agent loop injects before its next
|
||||||
|
// step. Replaces legacy agentkit's Agent.AttachImages — majordomo's *agent.Agent
|
||||||
|
// is immutable mid-run, so mutation flows through the run-scoped steer mailbox.
|
||||||
|
type runSession struct{ mailbox *steerMailbox }
|
||||||
|
|
||||||
|
func (s *runSession) AttachImages(text string, images ...llm.ImagePart) {
|
||||||
|
parts := make([]llm.Part, 0, len(images)+1)
|
||||||
|
if strings.TrimSpace(text) != "" {
|
||||||
|
parts = append(parts, llm.Text(text))
|
||||||
|
}
|
||||||
|
for _, img := range images {
|
||||||
|
parts = append(parts, img)
|
||||||
|
}
|
||||||
|
if len(parts) == 0 {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
s.mailbox.push(llm.UserParts(parts...))
|
||||||
|
}
|
||||||
|
|
||||||
|
// safeCleanup runs a SessionTools.Cleanup with panic isolation, so a misbehaving
|
||||||
|
// teardown (temp-dir removal, handle close) can't clobber an otherwise-successful
|
||||||
|
// run via the executor's top-level recover.
|
||||||
|
func safeCleanup(fn func()) {
|
||||||
|
defer func() {
|
||||||
|
if r := recover(); r != nil {
|
||||||
|
slog.Error("run: session Cleanup panicked", "panic", r)
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
fn()
|
||||||
|
}
|
||||||
@@ -0,0 +1,94 @@
|
|||||||
|
package run_test
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"testing"
|
||||||
|
|
||||||
|
"gitea.stevedudenhoeffer.com/steve/majordomo/llm"
|
||||||
|
"gitea.stevedudenhoeffer.com/steve/majordomo/provider/fake"
|
||||||
|
|
||||||
|
"gitea.stevedudenhoeffer.com/steve/executus/run"
|
||||||
|
"gitea.stevedudenhoeffer.com/steve/executus/tool"
|
||||||
|
)
|
||||||
|
|
||||||
|
// TestSessionToolFactoryPostRun: a SessionToolFactory's PostRun hook produces an
|
||||||
|
// artifact (from the run output + transcript) that lands on Result.PostRunResult,
|
||||||
|
// and its Cleanup is deferred.
|
||||||
|
func TestSessionToolFactoryPostRun(t *testing.T) {
|
||||||
|
fp := fake.New("fake")
|
||||||
|
fp.Enqueue("m", fake.Reply("hello artifacts"))
|
||||||
|
m, _ := fp.Model("m")
|
||||||
|
|
||||||
|
cleanupCalled := false
|
||||||
|
inv := tool.Invocation{
|
||||||
|
RunID: "r1",
|
||||||
|
SessionToolFactory: func(_ tool.AgentSession) tool.SessionTools {
|
||||||
|
return tool.SessionTools{
|
||||||
|
PostRun: func(_ context.Context, transcript []llm.Message, output string, _ error) *tool.PostRunResult {
|
||||||
|
return &tool.PostRunResult{
|
||||||
|
Artifacts: []tool.Artifact{{Name: "out.txt", MimeType: "text/plain", Data: []byte(output)}},
|
||||||
|
Metadata: map[string]any{"transcript_len": len(transcript)},
|
||||||
|
}
|
||||||
|
},
|
||||||
|
Cleanup: func() { cleanupCalled = true },
|
||||||
|
}
|
||||||
|
},
|
||||||
|
}
|
||||||
|
ex := run.New(run.Config{
|
||||||
|
Registry: tool.NewRegistry(),
|
||||||
|
Models: func(ctx context.Context, _ string) (context.Context, llm.Model, error) { return ctx, m, nil },
|
||||||
|
})
|
||||||
|
res := ex.Run(context.Background(), run.RunnableAgent{ModelTier: "m"}, inv, "go")
|
||||||
|
if res.Err != nil {
|
||||||
|
t.Fatalf("run error: %v", res.Err)
|
||||||
|
}
|
||||||
|
if res.PostRunResult == nil {
|
||||||
|
t.Fatal("Result.PostRunResult is nil — PostRun hook not invoked / not attached")
|
||||||
|
}
|
||||||
|
if n := len(res.PostRunResult.Artifacts); n != 1 {
|
||||||
|
t.Fatalf("artifacts = %d, want 1", n)
|
||||||
|
}
|
||||||
|
a := res.PostRunResult.Artifacts[0]
|
||||||
|
if a.Name != "out.txt" || string(a.Data) != "hello artifacts" {
|
||||||
|
t.Errorf("artifact = {%q, %q}", a.Name, string(a.Data))
|
||||||
|
}
|
||||||
|
if tl, _ := res.PostRunResult.Metadata["transcript_len"].(int); tl < 1 {
|
||||||
|
t.Errorf("transcript not passed to PostRun (len=%d)", tl)
|
||||||
|
}
|
||||||
|
if !cleanupCalled {
|
||||||
|
t.Error("Cleanup was not deferred/called")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// TestSessionToolFactoryAddsTool: tools the factory returns join the run's
|
||||||
|
// toolbox and are callable by the model.
|
||||||
|
func TestSessionToolFactoryAddsTool(t *testing.T) {
|
||||||
|
fp := fake.New("fake")
|
||||||
|
fp.Enqueue("m",
|
||||||
|
fake.ReplyWith(llm.Response{ToolCalls: []llm.ToolCall{{ID: "c1", Name: "render", Arguments: []byte(`{}`)}}}),
|
||||||
|
fake.Reply("rendered"),
|
||||||
|
)
|
||||||
|
m, _ := fp.Model("m")
|
||||||
|
|
||||||
|
toolCalled := false
|
||||||
|
renderTool := llm.DefineTool("render", "render a preview",
|
||||||
|
func(_ context.Context, _ struct{}) (any, error) { toolCalled = true; return "ok", nil })
|
||||||
|
inv := tool.Invocation{
|
||||||
|
RunID: "r2",
|
||||||
|
SessionToolFactory: func(_ tool.AgentSession) tool.SessionTools {
|
||||||
|
return tool.SessionTools{Tools: []llm.Tool{renderTool}}
|
||||||
|
},
|
||||||
|
}
|
||||||
|
ex := run.New(run.Config{
|
||||||
|
Registry: tool.NewRegistry(),
|
||||||
|
Models: func(ctx context.Context, _ string) (context.Context, llm.Model, error) { return ctx, m, nil },
|
||||||
|
})
|
||||||
|
res := ex.Run(context.Background(),
|
||||||
|
run.RunnableAgent{ModelTier: "m", MaxIterations: 5}, inv, "go")
|
||||||
|
if res.Err != nil {
|
||||||
|
t.Fatalf("run error: %v", res.Err)
|
||||||
|
}
|
||||||
|
if !toolCalled {
|
||||||
|
t.Error("session-factory tool was not added to the toolbox / not called")
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -173,6 +173,13 @@ type Invocation struct {
|
|||||||
CallerID string
|
CallerID string
|
||||||
ChannelID string
|
ChannelID string
|
||||||
GuildID string
|
GuildID string
|
||||||
|
// DeliveryKind / DeliveryID name where the executor posts the run's output
|
||||||
|
// via run.Ports.Delivery — a host-interpreted Target ("channel"/"dm"/
|
||||||
|
// "thread"/...). An empty DeliveryID means the executor delivers nothing
|
||||||
|
// and the caller reads Result.Output itself (the synchronous default; the
|
||||||
|
// `.agent run` canary works this way).
|
||||||
|
DeliveryKind string
|
||||||
|
DeliveryID string
|
||||||
// CallerIsAdmin is true when the caller is a mort admin (Member.Admin).
|
// CallerIsAdmin is true when the caller is a mort admin (Member.Admin).
|
||||||
// Populated by the executor at run dispatch via Bot.GetMember; defaults
|
// Populated by the executor at run dispatch via Bot.GetMember; defaults
|
||||||
// to false on any lookup failure (member not found, DB error, empty
|
// to false on any lookup failure (member not found, DB error, empty
|
||||||
|
|||||||
Reference in New Issue
Block a user