diff --git a/CLAUDE.md b/CLAUDE.md index dcc609e..f9b1dca 100644 --- a/CLAUDE.md +++ b/CLAUDE.md @@ -43,8 +43,10 @@ CORE (majordomo + stdlib): fanout/ programmatic N×M swarm [P0 ✓] deliver/ output egress seam (+ Discard/Stdout) [P0 ✓] identity/ caller identity seams [P0 ✓] - run/ progress bridge now; the executor kernel + [P0 partial] - nil-safe Ports + RunnableAgent later [P2] + run/ run-loop mechanics (cancel-merge, finalizers, [P2 wip] + RunStateAccessor via RunTally seam, submit, + progress bridge) + RunnableAgent DTO done; + executor merge + nil-safe run.Ports next [P2] dispatchguard/ loop/depth/fan-out caps [P0 ✓] pendingattach/ attachment dedupe [P0 ✓] tool/ registry + 3-stage permissions + ssrf [P1 ✓] diff --git a/run/agent.go b/run/agent.go new file mode 100644 index 0000000..18a0a98 --- /dev/null +++ b/run/agent.go @@ -0,0 +1,76 @@ +package run + +import "time" + +// RunnableAgent is the kernel's view of "a thing to run": an identity, a model +// tier, a system prompt, execution caps, and a tool palette. It is a plain DTO +// on purpose — the run kernel never imports a noun battery. The persona Agent +// and the saved Skill each LOWER themselves into a RunnableAgent (a ToRunnable +// method on the battery side), and the kernel runs the DTO. This is the +// inversion of mort's agentexec.Executor.Run(*agents.Agent): the executor no +// longer depends on the persona struct, only on this shape. +// +// A light host can build a RunnableAgent inline (model tier + prompt + a few +// tool names) for a one-shot bounded run, with no persona or skill battery at +// all — that is exactly gadfly's swarm task. +type RunnableAgent struct { + // ID is a stable identifier for the run subject (an agent/skill UUID, or + // any host-chosen id). Used for audit attribution and dispatch-guard + // genealogy. Empty is allowed for anonymous one-shot runs. + ID string + + // Name is a human label (audit/logs/delivery). Empty is allowed. + Name string + + // SystemPrompt is the agent's base system prompt (before per-run + // personalization, which a host layers via Ports). + SystemPrompt string + + // ModelTier is a tier alias or concrete spec resolved through + // model.ParseModelForContext. Empty resolves to the host's default tier. + ModelTier string + + // MaxIterations caps the agent loop's tool-dispatch steps. 0 = kernel + // default. MaxRuntime caps wall-clock for the whole run (the kernel starts + // this clock AFTER any lane dequeue, not at submission). 0 = kernel + // default. + MaxIterations int + MaxRuntime time.Duration + + // LowLevelTools are tool-registry names the run may call directly. + // SkillPalette / SubAgentPalette name saved skills / sub-agents exposed as + // skill__ / agent__ delegation tools, resolved through + // Ports.Palette (nil Palette => those entries are inert). + LowLevelTools []string + SkillPalette []string + SubAgentPalette []string + + // Phases optionally model a multi-step pipeline (each phase its own prompt + // + tier + tools). An empty slice is a single-phase run — the common case. + Phases []Phase + + // Critic configures the optional two-tier run-critic (Ports.Critic). The + // zero value (disabled) is the light-host default. + Critic CriticConfig +} + +// Phase is one step of a multi-step run: its own system prompt, model tier, +// iteration cap, and tool subset. Optional phases may be skipped by the +// pipeline when their precondition isn't met. +type Phase struct { + Name string + SystemPrompt string + ModelTier string + MaxIterations int + Tools []string + Optional bool +} + +// CriticConfig configures the optional run-critic. Enabled gates whether a +// critic monitor is started at all; BackstopMultiplier sets the hard-kill +// deadline as a multiple of the soft trigger (MaxRuntime). A non-positive +// multiplier uses the kernel default. +type CriticConfig struct { + Enabled bool + BackstopMultiplier float64 +} diff --git a/run/runengine.go b/run/runengine.go new file mode 100644 index 0000000..2b311e4 --- /dev/null +++ b/run/runengine.go @@ -0,0 +1,157 @@ +// Package run is executus's run kernel: the shared run-loop mechanics around +// majordomo's agent loop, plus the host seams (run.Ports / RunnableAgent) that +// let one executor serve every surface — a light host's bounded one-shot run, +// a heavy host's persona agent or saved skill — without the kernel importing a +// battery. +// +// This file holds the genuinely-identical scaffolding both run shapes need: +// context cancellation merging, the detached-cleanup timeout, the per-run +// progress accessor the self-status tool reads, the legacy `submit` +// compatibility tool (submit.go), the ancestor progress bridge (progress.go), +// and the run-finalizer machinery — one source of truth. +// +// The kernel depends only on majordomo + executus/tool + the run.Ports +// interfaces; persistence, audit, the persona/skill nouns, and the critic are +// host-supplied via Ports (see ports.go) so importing the kernel never drags in +// a store or a battery. +package run + +import ( + "context" + "errors" + "log/slog" + "sync/atomic" + "time" + + "gitea.stevedudenhoeffer.com/steve/executus/tool" +) + +// ErrShutdown is the cancellation cause set on mort's base lifecycle context +// when the process is shutting down (SIGTERM after the drain window). The +// agent executor uses it to distinguish a run interrupted by shutdown (which +// should be left durable-recoverable) from a run that errored or hit its own +// deadline (terminal). +var ErrShutdown = errors.New("mort: shutting down") + +// CleanupContextTimeout caps how long a run's post-completion cleanup ops +// (budget commit, audit Close, attachment bookkeeping) may wait on +// storage after detaching from the caller's — possibly already +// cancelled — context. 10s is generous for a single-row UPDATE against +// MySQL; longer suggests a hung connection the run goroutine shouldn't +// keep waiting on. Both executors derive their cleanup contexts as +// context.WithTimeout(context.WithoutCancel(ctx), CleanupContextTimeout). +const CleanupContextTimeout = 10 * time.Second + +// Reserved state-react lifecycle event keys, shared so both nouns surface +// the same UX shape. Namespaced with double-underscores to make accidental +// collision with a tool name near-impossible. +const ( + StateReactStart = "__start__" + StateReactEnd = "__end__" + StateReactError = "__error__" + StateReactBudgetExceeded = "__budget_exceeded__" +) + +// MergeCancellation returns a context cancelled when EITHER input is +// cancelled, propagating the cancellation Cause from whichever fired. Used +// by the lane preemption path (the lane's per-job ctx.Cause flows into the +// run context) and by the runtime-detach path (process shutdown still +// reaches a run whose deadline was reset after a lane wait). Always call +// the returned cancel to release the watcher goroutine; it is also invoked +// once when either input fires. +func MergeCancellation(parent, secondary context.Context) (context.Context, context.CancelFunc) { + merged, cancel := context.WithCancelCause(parent) + go func() { + select { + case <-merged.Done(): + return + case <-secondary.Done(): + cancel(context.Cause(secondary)) + } + }() + return merged, func() { cancel(nil) } +} + +// RunFinalizer is invoked at run finish so per-run tool state (open HTTP +// streams, per-run code_exec counters, per-run search budgets) is released +// and the process-lifetime maps keyed by run id don't grow unbounded. +// Both executors fire their registered finalizers via FireFinalizers. +type RunFinalizer interface { + FinalizeRun(runID string) +} + +// FireFinalizers runs every finalizer for runID, isolating each behind a +// panic-recover so one buggy finalizer can't take down the run goroutine +// or skip the others. Safe to call with a nil/empty slice. +func FireFinalizers(fs []RunFinalizer, runID string) { + for _, f := range fs { + if f == nil { + continue + } + func() { + defer func() { + if r := recover(); r != nil { + slog.Error("runengine: run finalizer panicked", + "run_id", runID, "panic", r) + } + }() + f.FinalizeRun(runID) + }() + } +} + +// RunTally is the narrow live-progress source the RunStateAccessor reads — +// the running token and tool-call counts for the in-flight run. The audit +// battery's writer satisfies it; this interface is how the run kernel reads +// live tallies without importing the audit package (the inversion of mort's +// direct *skillaudit.Writer dependency). +type RunTally interface { + // TokenStats returns the running input, output, and thinking token totals. + TokenStats() (in, out, thinking int64) + // ToolCallsCount returns the number of tool calls executed so far. + ToolCallsCount() int +} + +// RunStateAccessor is the per-run live-progress accessor the executor +// stamps on Invocation.RunState before building the toolbox, so the +// self-status tool can report iteration / tool-calls / tokens / elapsed for +// the in-flight run. Construct with NewRunStateAccessor; the executor's step +// observer calls SetIteration each loop. +type RunStateAccessor struct { + tally RunTally + iter atomic.Int32 + maxIter int + maxCalls int + startedAt time.Time +} + +// NewRunStateAccessor builds the accessor. writer supplies the live token +// + tool-call tallies; maxIter / maxCalls are the reported caps (0 = +// uncapped); startedAt anchors the elapsed clock. +func NewRunStateAccessor(tally RunTally, maxIter, maxCalls int, startedAt time.Time) *RunStateAccessor { + return &RunStateAccessor{ + tally: tally, + maxIter: maxIter, + maxCalls: maxCalls, + startedAt: startedAt, + } +} + +// SetIteration records the current agent-loop iteration (called from the +// executor's step observer). +func (a *RunStateAccessor) SetIteration(iter int) { a.iter.Store(int32(iter)) } + +// RunState satisfies tool.RunStateAccessor. +func (a *RunStateAccessor) RunState() tool.RunState { + in, out, think := a.tally.TokenStats() + return tool.RunState{ + Iteration: int(a.iter.Load()), + MaxIterations: a.maxIter, + ToolCalls: a.tally.ToolCallsCount(), + MaxToolCalls: a.maxCalls, + InputTokens: in, + OutputTokens: out, + ThinkingTokens: think, + ElapsedSeconds: int(time.Since(a.startedAt).Seconds()), + } +} diff --git a/run/submit.go b/run/submit.go new file mode 100644 index 0000000..59ae221 --- /dev/null +++ b/run/submit.go @@ -0,0 +1,84 @@ +package run + +import ( + "context" + "strings" + "sync" + + llm "gitea.stevedudenhoeffer.com/steve/majordomo/llm" +) + +// SubmitCapture records the output a run's `submit` tool received. +// +// Why this exists: legacy agentkit injected a synthetic `submit` tool and +// ended the loop when it fired; years of mort system prompts (agent +// YAMLs, skill manifests, the executors' platform headers) teach the +// model to "call submit with your final answer". majordomo's agent loop +// has no submit concept — it ends when the model replies WITHOUT tool +// calls. Dropping submit cold would make every prompt-trained model +// burn turns on "unknown tool \"submit\"" errors. +// +// The compatibility shape: the executors add NewSubmitTool's tool to +// every run's toolset (unless the palette already defines a `submit`). +// The handler records the FIRST submitted answer and tells the model +// the answer was accepted so its next turn is a bare reply (which ends +// the loop naturally). After the run, the executor consults +// Output(loopOutput, runErr): a captured submission wins over an empty +// or budget-exhausted ending, so a model that submits on its final +// allowed step still produces its answer instead of ErrMaxSteps. +type SubmitCapture struct { + mu sync.Mutex + output string + called bool +} + +// Record stores the first submitted answer; later calls are ignored +// (matching legacy agentkit's "multiple calls keep the first" contract). +func (c *SubmitCapture) Record(output string) { + c.mu.Lock() + defer c.mu.Unlock() + if c.called { + return + } + c.called = true + c.output = output +} + +// Submitted returns the captured answer and whether submit fired. +func (c *SubmitCapture) Submitted() (string, bool) { + c.mu.Lock() + defer c.mu.Unlock() + return c.output, c.called +} + +// Output resolves the run's final output: the submitted answer when the +// model called submit (parity with legacy agentkit, where submit's argument +// WAS the run output), otherwise the loop's own final text. resolvedErr +// is nil when a submission exists — a run that submitted its answer and +// then ran out of steps (or timed out composing the courtesy +// confirmation turn) is a SUCCESS, not an error. +func (c *SubmitCapture) Output(loopOutput string, runErr error) (output string, resolvedErr error) { + if out, ok := c.Submitted(); ok { + return out, nil + } + return loopOutput, runErr +} + +// submitArgs mirrors legacy agentkit's synthetic submit tool schema so +// models prompted under the old contract emit compatible calls. +type submitArgs struct { + Output string `json:"output" description:"The final answer, summary, or output for this task."` +} + +// NewSubmitTool builds the compatibility `submit` tool bound to the +// given capture. Both executors (skill + agent) install one per run. +func NewSubmitTool(capture *SubmitCapture) llm.Tool { + return llm.DefineTool[submitArgs]( + "submit", + "Submit your final answer or output to end this task. Call exactly once when you are done.", + func(_ context.Context, args submitArgs) (any, error) { + capture.Record(strings.TrimSpace(args.Output)) + return "Final answer recorded. Do not call any more tools; reply now with a brief closing message.", nil + }, + ) +}