Files
executus/run/executor.go
T
steve cb4c612461
executus CI / test (pull_request) Successful in 1m45s
fix(run): address gadfly review of the critic-deadline PR
All 11 findings were real (3 clusters):

- Failsafe ceiling could pre-empt the critic's backstop (e9c9483f, 9109317b,
  d5a9bf0d, 76ad171e): CriticAbsoluteMax was 6h, but the host's backstop
  (MaxRuntime × multiplier, or its own absolute max) can reach 6h+, so the
  ceiling fired first and reintroduced a premature hard cap. Now CriticAbsoluteMax
  is a 24h RUNAWAY guard set far beyond any realistic backstop (the host clamps
  its own backstop to a much smaller absolute max, e.g. mort's 6h convar), so it
  never pre-empts a healthy supervised run. Comments corrected.

- nil Monitor handle lost the MaxRuntime cap (df016a6f, 9dd42827): a critic-enabled
  run whose host Monitor returned no handle had no deadline-watch and was bounded
  only by the generous ceiling. Added an unsupervised-run failsafe that re-wraps
  runCtx to the nominal MaxRuntime when the critic is enabled but didn't arm.
  New test TestCriticOwnsDeadline_NilHandleFallsBackToMaxRuntime.

- CriticSoftTimeout vestigial / dead fallback (f7764919, 9805bebe, 6864086f,
  b2b11721): the soft trigger is now always the resolved MaxRuntime (> 0), so the
  CriticSoftTimeout field + its startCritic fallback were unreachable. Removed the
  field entirely; the remaining 90s floor is documented as defensive-only.

- DRY (f30ce827): extracted e.criticOwnsDeadline(ra), now the single predicate used
  by both Run and startCritic so they can't drift.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
Claude-Session: https://claude.ai/code/session_01Jo75sqmeVPgFUWZQBn179X
2026-06-30 11:32:46 -04:00

630 lines
25 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
package run
import (
"context"
"errors"
"fmt"
"log/slog"
"time"
"gitea.stevedudenhoeffer.com/steve/majordomo/agent"
"gitea.stevedudenhoeffer.com/steve/majordomo/llm"
"gitea.stevedudenhoeffer.com/steve/executus/compact"
"gitea.stevedudenhoeffer.com/steve/executus/deliver"
"gitea.stevedudenhoeffer.com/steve/executus/tool"
)
// ModelResolver resolves a tier alias or concrete spec to a usable llm.Model
// and an enriched context (for usage attribution). model.ParseModelForContext
// satisfies it.
type ModelResolver func(ctx context.Context, tier string) (context.Context, llm.Model, error)
// Defaults are the executor's fallback caps and loop guards, applied per run
// when the RunnableAgent leaves a field zero.
type Defaults struct {
MaxIterations int // tool-dispatch steps; default 12
MaxRuntime time.Duration // wall-clock per run; default 60s
FallbackTier string // tier when the agent's is empty; default "fast"
MaxConsecutiveToolErrors int // loop guard; default 3
MaxSameToolCallRepeats int // retry-storm guard; default 3
CompactionThresholdRatio float64 // fraction of model context to compact at; default 0.7
// CriticAbsoluteMax is the RUNAWAY ceiling for a critic-OWNED run (Ports.Critic
// set AND the agent enables it). For such a run MaxRuntime is the SOFT trigger,
// not a hard cap, and the critic's own extendable backstop is the normal
// deadline. This ceiling exists ONLY to stop a critic that never advances its
// deadline (a broken host handle) from running forever, so it is deliberately
// set FAR beyond any realistic backstop (default 24h): the host clamps its own
// backstop to a much smaller absolute max (e.g. mort's agents.critic.
// absolute_max_seconds = 6h), so the ceiling never pre-empts a healthy
// supervised run. Keep it well above the host's absolute max. Never shorter than
// the run's MaxRuntime. Non-critic runs ignore it (they keep the literal
// MaxRuntime kill).
CriticAbsoluteMax time.Duration
}
func (d Defaults) withFallbacks() Defaults {
if d.MaxIterations <= 0 {
d.MaxIterations = 12
}
if d.MaxRuntime <= 0 {
d.MaxRuntime = 60 * time.Second
}
if d.FallbackTier == "" {
d.FallbackTier = "fast"
}
if d.MaxConsecutiveToolErrors <= 0 {
d.MaxConsecutiveToolErrors = 3
}
if d.MaxSameToolCallRepeats <= 0 {
d.MaxSameToolCallRepeats = 3
}
if d.CompactionThresholdRatio <= 0 {
d.CompactionThresholdRatio = 0.7
}
if d.CriticAbsoluteMax <= 0 {
d.CriticAbsoluteMax = 24 * time.Hour
}
return d
}
// Config wires an Executor. Registry + Models are required; everything else is
// optional and nil-safe — the zero Config beyond those yields a bounded,
// in-memory run with no persistence/audit/budget/critic/delegation/compaction
// (gadfly's case).
type Config struct {
Registry tool.Registry
Models ModelResolver
Defaults Defaults
Ports Ports
// Compactor mints the per-run context-compaction hook. nil disables
// compaction. ContextTokens resolves a tier's model context-window (for
// the compaction threshold); nil — or a zero return — also disables it.
Compactor compact.CompactorFactory
ContextTokens func(tier string) int
// SystemHeader is an optional platform header prepended to every agent's
// system prompt.
SystemHeader string
}
// Executor runs a RunnableAgent through majordomo's agent loop with the wired
// Ports. Construct with New; safe for concurrent use across runs.
type Executor struct {
cfg Config
}
// New builds an Executor. It panics if Registry or Models is nil — those are
// structural, not runtime, errors.
func New(cfg Config) *Executor {
if cfg.Registry == nil || cfg.Models == nil {
panic("run.New: Registry and Models are required")
}
cfg.Defaults = cfg.Defaults.withFallbacks()
return &Executor{cfg: cfg}
}
// Result is one run's outcome. Err carries the run failure (if any); the other
// fields are populated best-effort even on error (partial output/steps/usage).
type Result struct {
RunID string
Output string
Steps []tool.Step
Usage llm.Usage
Err error
// PostRunResult carries artifacts produced by a SessionToolFactory's PostRun
// hook (rendered images, files). nil when no factory was set or PostRun
// returned nil. The host delivers these (e.g. mort's chat API / Discord).
PostRunResult *tool.PostRunResult
}
// Run executes ra with the given invocation + input and returns the Result. It
// never propagates a panic; failures surface in Result.Err (a top-level recover
// converts any panic — including from a host Port — into a run error).
func (e *Executor) Run(ctx context.Context, ra RunnableAgent, inv tool.Invocation, input string) (res Result) {
started := time.Now()
res = Result{RunID: inv.RunID}
// ckpt is the per-run durable checkpointer (resolved below; nil = non-durable).
// checkpointCause yields the run context's cancellation cause once the run
// context exists; nil before then (an early build-error return).
var ckpt Checkpointer
var checkpointCause func() error
// Enforce the no-panic contract: a panic anywhere in the run (incl. a host
// Critic/Audit/Palette callback on the main goroutine) becomes Result.Err
// rather than unwinding into the caller. This defer ALSO finalizes the
// checkpoint on EVERY exit path — panic, an early build-error return (before
// the run loop), or normal completion — so a recovered run's durable record is
// never left dangling (which would loop boot-recovery on a persistent error).
defer func() {
if r := recover(); r != nil {
res.Err = fmt.Errorf("run.Executor: recovered panic: %v", r)
}
var cause error
if checkpointCause != nil {
cause = checkpointCause()
}
finalizeCheckpoint(ctx, ckpt, res.Err, cause)
}()
tier := ra.ModelTier
if tier == "" {
tier = e.cfg.Defaults.FallbackTier
}
maxIter := ra.MaxIterations
if maxIter <= 0 {
maxIter = e.cfg.Defaults.MaxIterations
}
maxRuntime := ra.MaxRuntime
if maxRuntime <= 0 {
maxRuntime = e.cfg.Defaults.MaxRuntime
}
// Budget gate (pre-run): a rejected run makes no model call.
if e.cfg.Ports.Budget != nil {
if err := e.cfg.Ports.Budget.Check(ctx, inv.CallerID); err != nil {
res.Err = err
return res
}
}
// Resolve the model (enriches ctx for usage attribution).
modelCtx, model, err := e.cfg.Models(ctx, tier)
if err != nil {
res.Err = fmt.Errorf("resolve model %q: %w", tier, err)
return res
}
if model == nil {
// A resolver returning (ctx, nil, nil) would otherwise nil-panic inside
// the agent loop; surface it as a clean error (Run never panics out).
res.Err = fmt.Errorf("resolve model %q: resolver returned a nil model", tier)
return res
}
ctx = modelCtx
// Audit start (optional). The recorder satisfies RunTally; stamp it on the
// invocation so a self-status tool can read live progress.
info := RunInfo{
RunID: inv.RunID,
SubjectID: ra.ID,
Name: ra.Name,
CallerID: inv.CallerID,
ChannelID: inv.ChannelID,
GuildID: inv.GuildID,
ParentRunID: inv.ParentRunID,
ModelTier: tier,
Inputs: inv.SkillInputs,
StartedAt: started,
MaxIterations: maxIter,
}
var rec RunRecorder
var stateAcc *RunStateAccessor
if e.cfg.Ports.Audit != nil {
rec = e.cfg.Ports.Audit.StartRun(ctx, info)
}
if rec != nil {
stateAcc = NewRunStateAccessor(rec, maxIter, 0, started)
inv.RunState = stateAcc
}
// Durable recovery (optional): a recovered run carries a ResumeState (prior
// transcript / completed phases) + an existing Checkpointer in ctx so it
// continues on the SAME durable record; a fresh run mints a per-run
// Checkpointer via the factory (which decides durability — nil = non-durable).
// nil-safe throughout.
resume := resumeStateFromContext(ctx)
ckpt = existingCheckpointerFromContext(ctx)
if ckpt == nil && e.cfg.Ports.Checkpointer != nil {
c, cerr := e.cfg.Ports.Checkpointer.Begin(ctx, info)
if cerr != nil {
// Degrade to non-durable (the documented contract) but log it — a
// failing checkpoint store must not fail the run, yet shouldn't be silent.
slog.Warn("run: checkpointer Begin failed; running non-durable",
"run_id", inv.RunID, "error", cerr)
} else {
ckpt = c
}
}
// Steer mailbox: lets session tools (via inv.AttachImages) feed multimodal
// messages into the running conversation before its next step. Created BEFORE
// the toolbox build so any tool's handler captures the live AttachImages seam.
mailbox := &steerMailbox{}
inv.AttachImages = (&runSession{mailbox: mailbox}).AttachImages
// Build the toolbox from the agent's low-level tools.
toolbox, err := e.cfg.Registry.Build(ra.LowLevelTools, inv, tool.Visibility("private"), nil)
if err != nil {
res.Err = fmt.Errorf("build toolbox: %w", err)
e.finishAudit(ctx, rec, "error", res, started, res.Err)
return res
}
// Add skill__/agent__ delegation tools from the agent's palette (nil-safe:
// no PaletteSource or empty palette → no delegation tools).
if err := addDelegationTools(toolbox, ra, inv, e.cfg.Ports.Palette); err != nil {
res.Err = fmt.Errorf("build delegation tools: %w", err)
e.finishAudit(ctx, rec, "error", res, started, res.Err)
return res
}
// Per-invocation ExtraTools + a SessionToolFactory's per-run tools, added on
// top of the agent's palette. The factory closes over the live session (the
// AttachImages mailbox); its PostRun hook (held for after the run) produces
// artifacts attached to res.PostRunResult, and its Cleanup is deferred. All
// nil-safe.
for _, t := range inv.ExtraTools {
if err := toolbox.Add(t); err != nil {
res.Err = fmt.Errorf("add extra tool: %w", err)
e.finishAudit(ctx, rec, "error", res, started, res.Err)
return res
}
}
var postRun func(ctx context.Context, transcript []llm.Message, output string, runErr error) *tool.PostRunResult
if inv.SessionToolFactory != nil {
st := inv.SessionToolFactory(&runSession{mailbox: mailbox})
if st.Cleanup != nil {
defer safeCleanup(st.Cleanup) // panic-isolated, like runPostRun
}
for _, t := range st.Tools {
if err := toolbox.Add(t); err != nil {
res.Err = fmt.Errorf("add session tool: %w", err)
e.finishAudit(ctx, rec, "error", res, started, res.Err)
return res
}
}
postRun = st.PostRun
}
// Run context: detached from the caller's deadline so a lane/queue wait doesn't
// eat the run budget (mort's V10 lesson). Caller cancellation still propagates
// via MergeCancellation. Created BEFORE the step observer so the observer
// forwards the merged run context (not a possibly-cancelled caller ctx) to
// OnStep consumers.
//
// Two-tier timeout: who owns the hard deadline depends on the critic.
// - NO critic (the default): MaxRuntime is a literal WithTimeout. Its
// DeadlineExceeded propagates through the child chain (→ "timeout"),
// preserving the run's-own-timeout vs caller-cancel distinction.
// - critic OWNS the deadline (Ports.Critic set + ra.Critic.Enabled):
// MaxRuntime becomes the SOFT trigger (passed to startCritic), and the
// critic's extendable backstop — watched in startCritic, which cancels via
// cancelCause — is the real deadline. A slow-but-progressing run is given
// room up to that backstop; only a stalled one is killed. The base context
// gets a WithTimeout at CriticAbsoluteMax (default 24h) purely as a RUNAWAY
// guard for a critic that never advances its deadline: it is set FAR beyond
// any realistic backstop (the host clamps its own backstop to a much smaller
// absolute max, e.g. mort's 6h convar), so it does NOT pre-empt a healthy
// supervised run. If the host critic fails to ARM (nil handle), the run is
// unsupervised and we tighten the cap back down to MaxRuntime below.
// A NESTED cause-carrying layer (cancelCause) lets a critic kill surface as a
// distinct "killed": only an ErrCriticKill cause is consulted in statusFor; a
// generic run error, a backstop expiry, or a caller cancel is classified by the
// run error itself.
criticOwns := e.criticOwnsDeadline(ra)
hardCap := maxRuntime
if criticOwns {
// Runaway guard only — the critic's own (extendable) deadline-watch is the
// normal cap. Never shorter than the nominal budget, in case an operator
// sets MaxRuntime above the runaway ceiling (a degenerate config).
hardCap = e.cfg.Defaults.CriticAbsoluteMax
if hardCap < maxRuntime {
hardCap = maxRuntime
}
}
timeoutCtx, cancelTimeout := context.WithTimeout(context.WithoutCancel(ctx), hardCap)
defer cancelTimeout()
runCtx, cancelCause := context.WithCancelCause(timeoutCtx)
defer cancelCause(nil)
runCtx, mergeCancel := MergeCancellation(runCtx, ctx)
defer mergeCancel()
// Critic (optional): monitors the run for a stall, can nudge/extend/kill via
// its host Escalator. When it owns the deadline, MaxRuntime is its soft trigger
// (so a slow-but-progressing run survives past it); its extendable backstop is
// bound to runCtx (cancel on pass). nil-safe: no-op when no critic is configured
// or the agent doesn't enable it.
critic, stopCritic := e.startCritic(runCtx, cancelCause, ra, info, maxRuntime)
defer stopCritic()
// Unsupervised-run failsafe: the agent enabled the critic (so the base context
// got the generous runaway ceiling instead of MaxRuntime), but the host Monitor
// returned no handle — there is no deadline-watch. Without this the run would be
// bounded only by the 24h ceiling. Tighten it back to the nominal MaxRuntime so
// an unsupervised run can't hold its slot far past budget. mort's adapter always
// arms when the flag is set, so this is pure defence in depth.
if criticOwns && critic == nil {
var cancelUnsupervised context.CancelFunc
runCtx, cancelUnsupervised = context.WithTimeout(runCtx, maxRuntime)
defer cancelUnsupervised()
}
// The finalize defer (top of Run) now has a run context to read the
// cancellation cause from (shutdown vs critic-kill vs deadline vs cancel). Set
// AFTER the unsupervised-failsafe re-wrap so it reads the context the loop runs on.
checkpointCause = func() error { return context.Cause(runCtx) }
// Step instrumentation: accumulate Result.Steps + fire inv.OnStep, feed the
// audit recorder, and keep the live iteration counter fresh. majordomo's
// step observer hands us each completed iteration; we zip the model's tool
// calls with their executed results PAIRWISE — a result without a matching
// call (or a call without a result) is skipped rather than recorded as an
// empty-name "ghost" step.
emitter := newStepEmitter(inv.OnStep)
stepObserver := func(s agent.Step) {
if stateAcc != nil {
stateAcc.SetIteration(s.Index)
}
if rec != nil {
rec.OnStep(s.Index, s.Response)
}
critic.recordStep(s.Index, s.Response) // keep the critic's activity clock fresh + carry the step payload
var calls []llm.ToolCall
if s.Response != nil {
calls = s.Response.ToolCalls
}
n := len(s.Results)
if len(calls) < n {
n = len(calls)
}
for i := 0; i < n; i++ {
call, r := calls[i], s.Results[i]
critic.recordToolStart(call.Name, string(call.Arguments))
emitter.toolStart(runCtx, call.Name, call.Arguments)
emitter.toolEnd(runCtx, call, r.Content, r.IsError)
if rec != nil {
rec.OnTool(call, r.Content)
}
}
}
// Shared agent options used by BOTH the single-loop path and every phase: the
// tool-error guards and optional compaction. The toolbox, step ceiling, AND
// step observer are added per path (the observer is wrapped for checkpointing,
// which differs single-loop vs per-phase).
sharedOpts := []agent.Option{
agent.WithToolErrorLimits(e.cfg.Defaults.MaxConsecutiveToolErrors, e.cfg.Defaults.MaxSameToolCallRepeats),
}
if e.cfg.Compactor != nil && e.cfg.ContextTokens != nil {
if threshold := e.compactionThreshold(tier); threshold > 0 {
// Forward compaction events to the audit log (makes the
// CompactionEvent doc's "logged to the run trace" promise true).
var onFire func(compact.CompactionEvent)
if rec != nil {
onFire = func(ev compact.CompactionEvent) {
rec.LogEvent("compaction_fired", map[string]any{
"messages_before": ev.MessagesBefore,
"messages_after": ev.MessagesAfter,
"tokens_before": ev.TokensBefore,
"tokens_after": ev.TokensAfter,
})
}
}
sharedOpts = append(sharedOpts, agent.WithCompactor(e.cfg.Compactor(threshold, onFire)))
}
}
// Stage non-image input attachments (audio/PDF/binary) into the host file
// store and fold an [ATTACHED FILES] descriptor into the prompt so the agent
// can reach them by file_id. No-op when Ports.InputFiles is nil or there are
// no files. Done after the model/toolbox build but before the loop, so the
// descriptor rides the very first user turn.
input = e.stageInputFiles(runCtx, inv.RunID, ra.ID, inv.InputFiles, input)
// One WithSteer drains BOTH the session mailbox (a tool's AttachImages) and
// the critic's nudges before each step.
steer := func() []llm.Message { return append(mailbox.drain(), critic.drainSteer()...) }
resuming := resume != nil && len(resume.History) > 0
var runRes *agent.Result
var runErr error
if len(ra.Phases) == 0 {
// Single-loop run: the agent's base prompt + full toolbox, with the
// critic's DYNAMIC step ceiling (WithMaxStepsFunc, so it can raise a
// healthy-but-long run's budget mid-flight; falls back to maxIter).
//
// Checkpointing: wrap the step observer to accumulate the running transcript
// and Save it each step. Save is called every step; THROTTLING is the
// Checkpointer's responsibility (the battery + mort's durable-job adapter
// both throttle + size-cap), so the kernel doesn't gate the hot path. The
// accumulated transcript is the pre-compaction one (the observer sees raw
// step responses, not the loop's compacted history) — a host that caps size
// bounds it. A recovered run seeds the saved transcript and continues.
obs := stepObserver
if ckpt != nil {
var acc []llm.Message
if resuming {
acc = append([]llm.Message(nil), resume.History...)
} else {
acc = []llm.Message{multimodalUserMessage(input, inv.Images)}
}
obs = func(s agent.Step) {
stepObserver(s)
if s.Response != nil {
acc = append(acc, s.Response.Message())
}
if len(s.Results) > 0 {
acc = append(acc, llm.ToolResultsMessage(s.Results...))
}
_ = ckpt.Save(runCtx, RunCheckpointState{Messages: acc, Iteration: s.Index + 1})
}
}
opts := append([]agent.Option{
agent.WithToolbox(toolbox),
critic.maxStepsOption(maxIter),
agent.WithStepObserver(obs),
}, sharedOpts...)
ag := agent.New(model, e.systemPrompt(ra), opts...)
if resuming {
// Resume: seed the saved transcript and continue (no new input — the
// completed tool calls in the transcript are NOT re-run).
runRes, runErr = ag.Run(runCtx, "", agent.WithSteer(steer), agent.WithHistory(resume.History))
} else {
runRes, runErr = runAgent(runCtx, ag, input, inv.Images, agent.WithSteer(steer))
}
} else {
// Multi-phase pipeline: each phase runs its own prompt/tier/tools/step-cap
// sequentially, threading outputs through {{.<PhaseName>}} templates. The
// shared step observer (audit/steps/critic) is wired per phase by the phase
// runner; checkpointing is phase-boundary granular (completed phases are
// recorded so a resumed run skips them).
runRes, runErr = e.runPhases(runCtx, ra, phaseDeps{
baseModel: model,
baseToolbox: toolbox,
baseMaxIter: maxIter,
sharedOpts: sharedOpts,
stepObserver: stepObserver,
steer: steer,
rec: rec,
checkpointer: ckpt,
resume: resume,
}, input, inv.Images)
}
// Durable-recovery finalize (Complete/Fail/leave-running) happens in the
// top-of-Run defer so it covers panics + early build-error returns too.
status := statusFor(runCtx, runErr)
if runRes != nil {
res.Output = runRes.Output
res.Usage = runRes.Usage
}
res.Steps = emitter.snapshot()
res.Err = runErr
// PostRun: hand the SessionToolFactory's hook the full transcript (populated
// even on partial results) so it can produce artifacts. Best-effort +
// panic-isolated — a PostRun failure never fails an otherwise-successful run.
if postRun != nil {
var transcript []llm.Message
if runRes != nil {
transcript = runRes.Messages
}
// Detach from the caller's ctx: a finished/cancelled caller must not abort
// artifact production (the hook owns its own bounding, per its contract).
res.PostRunResult = runPostRun(detach(ctx), postRun, transcript, res.Output, runErr)
}
e.finishAudit(ctx, rec, status, res, started, runErr)
if e.cfg.Ports.Budget != nil {
e.cfg.Ports.Budget.Commit(detach(ctx), inv.CallerID, time.Since(started).Seconds())
}
e.deliver(ctx, inv, res, runErr)
return res
}
// statusFor maps a run error to a RunStats.Status, distinguishing a critic kill
// (killed), a deadline (timeout), and a cancellation (cancelled — caller cancel
// or shutdown) from a generic error so audit consumers can tell them apart. The
// run context's cancellation cause carries the distinction (ErrCriticKill /
// DeadlineExceeded), since ctx.Err() alone only reports Canceled.
func statusFor(runCtx context.Context, runErr error) string {
switch {
case runErr == nil:
return "ok"
// Only the kill is recovered from the cancellation cause — a critic kill
// surfaces as a plain Canceled run error, so without this it'd read as
// "cancelled". Everything else is classified by the run error itself, so a
// genuine run error is never relabeled just because the context was later
// cancelled, and a caller cancel/deadline stays "cancelled" (not "timeout").
case errors.Is(context.Cause(runCtx), ErrCriticKill):
return "killed"
case errors.Is(runErr, context.DeadlineExceeded):
return "timeout"
case errors.Is(runErr, context.Canceled):
return "cancelled"
default:
return "error"
}
}
// finishAudit writes the terminal roll-up on a detached context so a cancelled
// run still records (mort's CleanupContextTimeout lesson).
func (e *Executor) finishAudit(ctx context.Context, rec RunRecorder, status string, res Result, started time.Time, runErr error) {
if rec == nil {
return
}
stats := RunStats{
Status: status,
Output: res.Output,
ToolCalls: rec.ToolCallsCount(),
RuntimeSeconds: time.Since(started).Seconds(),
}
if runErr != nil {
stats.Error = runErr.Error()
}
stats.InputTokens, stats.OutputTokens, stats.ThinkingTokens = rec.TokenStats()
rec.Close(detach(ctx), stats)
}
func (e *Executor) systemPrompt(ra RunnableAgent) string {
return e.systemPromptWithBody(ra.SystemPrompt)
}
// systemPromptWithBody composes the optional platform header with an arbitrary
// body. The single-loop path passes ra.SystemPrompt; the phase runner passes a
// phase's expanded instructions, so each phase keeps the platform header.
func (e *Executor) systemPromptWithBody(body string) string {
if e.cfg.SystemHeader == "" {
return body
}
if body == "" {
return e.cfg.SystemHeader
}
return e.cfg.SystemHeader + "\n\n" + body
}
// compactionThreshold returns the token threshold for the tier's model context
// window (ratio × limit), or 0 when the limit is unknown.
func (e *Executor) compactionThreshold(tier string) int {
max := e.cfg.ContextTokens(tier)
if max <= 0 {
return 0
}
return int(float64(max) * e.cfg.Defaults.CompactionThresholdRatio)
}
// deliver posts the run's output (or error) via run.Ports.Delivery when both a
// Delivery and a target (inv.DeliveryID) are set. No target = the caller reads
// Result.Output itself (the synchronous default). Best-effort + detached: a
// delivery failure must not change the run's outcome.
func (e *Executor) deliver(ctx context.Context, inv tool.Invocation, res Result, runErr error) {
if e.cfg.Ports.Delivery == nil || inv.DeliveryID == "" {
return
}
target := deliver.Target{Kind: inv.DeliveryKind, ID: inv.DeliveryID}
dctx := detach(ctx)
if runErr != nil {
_ = e.cfg.Ports.Delivery.DeliverError(dctx, target, runErr)
return
}
_, _ = e.cfg.Ports.Delivery.Deliver(dctx, target, res.Output, nil)
}
// detach derives a bounded cleanup context off ctx, detached from its
// cancellation, for post-run writes. The cancel is intentionally not returned;
// CleanupContextTimeout bounds the lifetime.
func detach(ctx context.Context) context.Context {
c, cancel := context.WithTimeout(context.WithoutCancel(ctx), CleanupContextTimeout)
_ = cancel // bounded by the timeout; nothing to cancel early
return c
}
// runAgent dispatches the majordomo agent loop. majordomo's Run takes a text-only
// input arg, so when the invocation carries images they're folded into the first
// user message (text + image parts) via WithHistory and Run is called with an
// empty input — the model then sees a multimodal opening turn. The image-less path
// passes the prompt straight through.
//
// The text part is omitted when input is blank (image-only run), matching
// runSession.AttachImages so no empty TextPart is sent.
func runAgent(ctx context.Context, ag *agent.Agent, input string, images []llm.ImagePart, opts ...agent.RunOption) (*agent.Result, error) {
if len(images) == 0 {
return ag.Run(ctx, input, opts...)
}
// Copy opts before appending so a caller-supplied backing array is never
// mutated/aliased (the variadic slice can have spare capacity). The multimodal
// opening turn (text + image parts) is built by the shared helper.
opts = append(opts[:len(opts):len(opts)], agent.WithHistory([]llm.Message{multimodalUserMessage(input, images)}))
return ag.Run(ctx, "", opts...)
}