P2: run kernel + run.Ports inversion — executus is runnable #2

Merged
steve merged 9 commits from phase-2-run-kernel into main 2026-06-27 02:02:21 +00:00
4 changed files with 321 additions and 2 deletions
Showing only changes of commit aab950f1c3 - Show all commits
+4 -2
View File
@@ -43,8 +43,10 @@ CORE (majordomo + stdlib):
fanout/ programmatic N×M swarm [P0 ✓]
deliver/ output egress seam (+ Discard/Stdout) [P0 ✓]
identity/ caller identity seams [P0 ✓]
run/ progress bridge now; the executor kernel + [P0 partial]
nil-safe Ports + RunnableAgent later [P2]
run/ run-loop mechanics (cancel-merge, finalizers, [P2 wip]
RunStateAccessor via RunTally seam, submit,
progress bridge) + RunnableAgent DTO done;
executor merge + nil-safe run.Ports next [P2]
dispatchguard/ loop/depth/fan-out caps [P0 ✓]
pendingattach/ attachment dedupe [P0 ✓]
tool/ registry + 3-stage permissions + ssrf [P1 ✓]
+76
View File
@@ -0,0 +1,76 @@
package run
import "time"
// RunnableAgent is the kernel's view of "a thing to run": an identity, a model
// tier, a system prompt, execution caps, and a tool palette. It is a plain DTO
// on purpose — the run kernel never imports a noun battery. The persona Agent
// and the saved Skill each LOWER themselves into a RunnableAgent (a ToRunnable
// method on the battery side), and the kernel runs the DTO. This is the
// inversion of mort's agentexec.Executor.Run(*agents.Agent): the executor no
// longer depends on the persona struct, only on this shape.
//
// A light host can build a RunnableAgent inline (model tier + prompt + a few
// tool names) for a one-shot bounded run, with no persona or skill battery at
// all — that is exactly gadfly's swarm task.
type RunnableAgent struct {
// ID is a stable identifier for the run subject (an agent/skill UUID, or
// any host-chosen id). Used for audit attribution and dispatch-guard
// genealogy. Empty is allowed for anonymous one-shot runs.
ID string
// Name is a human label (audit/logs/delivery). Empty is allowed.
Name string
// SystemPrompt is the agent's base system prompt (before per-run
// personalization, which a host layers via Ports).
SystemPrompt string
// ModelTier is a tier alias or concrete spec resolved through
// model.ParseModelForContext. Empty resolves to the host's default tier.
ModelTier string
// MaxIterations caps the agent loop's tool-dispatch steps. 0 = kernel
// default. MaxRuntime caps wall-clock for the whole run (the kernel starts
// this clock AFTER any lane dequeue, not at submission). 0 = kernel
// default.
MaxIterations int
MaxRuntime time.Duration
// LowLevelTools are tool-registry names the run may call directly.
// SkillPalette / SubAgentPalette name saved skills / sub-agents exposed as
// skill__<name> / agent__<name> delegation tools, resolved through
// Ports.Palette (nil Palette => those entries are inert).
LowLevelTools []string
SkillPalette []string
SubAgentPalette []string
// Phases optionally model a multi-step pipeline (each phase its own prompt
// + tier + tools). An empty slice is a single-phase run — the common case.
Phases []Phase
// Critic configures the optional two-tier run-critic (Ports.Critic). The
// zero value (disabled) is the light-host default.
Critic CriticConfig
}
// Phase is one step of a multi-step run: its own system prompt, model tier,
// iteration cap, and tool subset. Optional phases may be skipped by the
// pipeline when their precondition isn't met.
type Phase struct {
Name string
SystemPrompt string
ModelTier string
MaxIterations int
Tools []string
Optional bool
}
// CriticConfig configures the optional run-critic. Enabled gates whether a
// critic monitor is started at all; BackstopMultiplier sets the hard-kill
// deadline as a multiple of the soft trigger (MaxRuntime). A non-positive
// multiplier uses the kernel default.
type CriticConfig struct {
Enabled bool
BackstopMultiplier float64
}
+157
View File
@@ -0,0 +1,157 @@
// Package run is executus's run kernel: the shared run-loop mechanics around
// majordomo's agent loop, plus the host seams (run.Ports / RunnableAgent) that
// let one executor serve every surface — a light host's bounded one-shot run,
// a heavy host's persona agent or saved skill — without the kernel importing a
// battery.
//
// This file holds the genuinely-identical scaffolding both run shapes need:
// context cancellation merging, the detached-cleanup timeout, the per-run
// progress accessor the self-status tool reads, the legacy `submit`
// compatibility tool (submit.go), the ancestor progress bridge (progress.go),
// and the run-finalizer machinery — one source of truth.
//
// The kernel depends only on majordomo + executus/tool + the run.Ports
// interfaces; persistence, audit, the persona/skill nouns, and the critic are
// host-supplied via Ports (see ports.go) so importing the kernel never drags in
// a store or a battery.
package run
import (
"context"
"errors"
"log/slog"
"sync/atomic"
"time"
"gitea.stevedudenhoeffer.com/steve/executus/tool"
)
// ErrShutdown is the cancellation cause set on mort's base lifecycle context
// when the process is shutting down (SIGTERM after the drain window). The
// agent executor uses it to distinguish a run interrupted by shutdown (which
// should be left durable-recoverable) from a run that errored or hit its own
// deadline (terminal).
var ErrShutdown = errors.New("mort: shutting down")
// CleanupContextTimeout caps how long a run's post-completion cleanup ops
// (budget commit, audit Close, attachment bookkeeping) may wait on
// storage after detaching from the caller's — possibly already
// cancelled — context. 10s is generous for a single-row UPDATE against
// MySQL; longer suggests a hung connection the run goroutine shouldn't
// keep waiting on. Both executors derive their cleanup contexts as
// context.WithTimeout(context.WithoutCancel(ctx), CleanupContextTimeout).
const CleanupContextTimeout = 10 * time.Second
// Reserved state-react lifecycle event keys, shared so both nouns surface
// the same UX shape. Namespaced with double-underscores to make accidental
// collision with a tool name near-impossible.
const (
StateReactStart = "__start__"
StateReactEnd = "__end__"
StateReactError = "__error__"
StateReactBudgetExceeded = "__budget_exceeded__"
)
// MergeCancellation returns a context cancelled when EITHER input is
// cancelled, propagating the cancellation Cause from whichever fired. Used
// by the lane preemption path (the lane's per-job ctx.Cause flows into the
// run context) and by the runtime-detach path (process shutdown still
// reaches a run whose deadline was reset after a lane wait). Always call
// the returned cancel to release the watcher goroutine; it is also invoked
// once when either input fires.
func MergeCancellation(parent, secondary context.Context) (context.Context, context.CancelFunc) {
merged, cancel := context.WithCancelCause(parent)
go func() {
select {
case <-merged.Done():
return
case <-secondary.Done():
cancel(context.Cause(secondary))
}
}()
return merged, func() { cancel(nil) }
}
// RunFinalizer is invoked at run finish so per-run tool state (open HTTP
// streams, per-run code_exec counters, per-run search budgets) is released
// and the process-lifetime maps keyed by run id don't grow unbounded.
// Both executors fire their registered finalizers via FireFinalizers.
type RunFinalizer interface {
FinalizeRun(runID string)
}
// FireFinalizers runs every finalizer for runID, isolating each behind a
// panic-recover so one buggy finalizer can't take down the run goroutine
// or skip the others. Safe to call with a nil/empty slice.
func FireFinalizers(fs []RunFinalizer, runID string) {
for _, f := range fs {
if f == nil {
continue
}
func() {
defer func() {
if r := recover(); r != nil {
slog.Error("runengine: run finalizer panicked",
"run_id", runID, "panic", r)
}
}()
f.FinalizeRun(runID)
}()
}
}
// RunTally is the narrow live-progress source the RunStateAccessor reads —
// the running token and tool-call counts for the in-flight run. The audit
// battery's writer satisfies it; this interface is how the run kernel reads
// live tallies without importing the audit package (the inversion of mort's
// direct *skillaudit.Writer dependency).
type RunTally interface {
// TokenStats returns the running input, output, and thinking token totals.
TokenStats() (in, out, thinking int64)
// ToolCallsCount returns the number of tool calls executed so far.
ToolCallsCount() int
}
// RunStateAccessor is the per-run live-progress accessor the executor
// stamps on Invocation.RunState before building the toolbox, so the
// self-status tool can report iteration / tool-calls / tokens / elapsed for
// the in-flight run. Construct with NewRunStateAccessor; the executor's step
// observer calls SetIteration each loop.
type RunStateAccessor struct {
tally RunTally
iter atomic.Int32
maxIter int
maxCalls int
startedAt time.Time
}
// NewRunStateAccessor builds the accessor. writer supplies the live token
// + tool-call tallies; maxIter / maxCalls are the reported caps (0 =
// uncapped); startedAt anchors the elapsed clock.
func NewRunStateAccessor(tally RunTally, maxIter, maxCalls int, startedAt time.Time) *RunStateAccessor {
return &RunStateAccessor{
tally: tally,
maxIter: maxIter,
maxCalls: maxCalls,
startedAt: startedAt,
}
}
// SetIteration records the current agent-loop iteration (called from the
// executor's step observer).
func (a *RunStateAccessor) SetIteration(iter int) { a.iter.Store(int32(iter)) }
// RunState satisfies tool.RunStateAccessor.
func (a *RunStateAccessor) RunState() tool.RunState {
in, out, think := a.tally.TokenStats()
return tool.RunState{
Iteration: int(a.iter.Load()),
MaxIterations: a.maxIter,
ToolCalls: a.tally.ToolCallsCount(),
MaxToolCalls: a.maxCalls,
InputTokens: in,
OutputTokens: out,
ThinkingTokens: think,
ElapsedSeconds: int(time.Since(a.startedAt).Seconds()),
}
}
+84
View File
@@ -0,0 +1,84 @@
package run
import (
"context"
"strings"
"sync"
llm "gitea.stevedudenhoeffer.com/steve/majordomo/llm"
)
// SubmitCapture records the output a run's `submit` tool received.
//
// Why this exists: legacy agentkit injected a synthetic `submit` tool and
// ended the loop when it fired; years of mort system prompts (agent
// YAMLs, skill manifests, the executors' platform headers) teach the
// model to "call submit with your final answer". majordomo's agent loop
// has no submit concept — it ends when the model replies WITHOUT tool
// calls. Dropping submit cold would make every prompt-trained model
// burn turns on "unknown tool \"submit\"" errors.
//
// The compatibility shape: the executors add NewSubmitTool's tool to
// every run's toolset (unless the palette already defines a `submit`).
// The handler records the FIRST submitted answer and tells the model
// the answer was accepted so its next turn is a bare reply (which ends
// the loop naturally). After the run, the executor consults
// Output(loopOutput, runErr): a captured submission wins over an empty
// or budget-exhausted ending, so a model that submits on its final
// allowed step still produces its answer instead of ErrMaxSteps.
type SubmitCapture struct {
mu sync.Mutex
output string
called bool
}
// Record stores the first submitted answer; later calls are ignored
// (matching legacy agentkit's "multiple calls keep the first" contract).
func (c *SubmitCapture) Record(output string) {
c.mu.Lock()
defer c.mu.Unlock()
if c.called {
return
}
c.called = true
c.output = output
}
// Submitted returns the captured answer and whether submit fired.
func (c *SubmitCapture) Submitted() (string, bool) {
c.mu.Lock()
defer c.mu.Unlock()
return c.output, c.called
}
// Output resolves the run's final output: the submitted answer when the
// model called submit (parity with legacy agentkit, where submit's argument
// WAS the run output), otherwise the loop's own final text. resolvedErr
// is nil when a submission exists — a run that submitted its answer and
// then ran out of steps (or timed out composing the courtesy
// confirmation turn) is a SUCCESS, not an error.
func (c *SubmitCapture) Output(loopOutput string, runErr error) (output string, resolvedErr error) {
if out, ok := c.Submitted(); ok {
return out, nil
}
return loopOutput, runErr
}
// submitArgs mirrors legacy agentkit's synthetic submit tool schema so
// models prompted under the old contract emit compatible calls.
type submitArgs struct {
Output string `json:"output" description:"The final answer, summary, or output for this task."`
}
// NewSubmitTool builds the compatibility `submit` tool bound to the
// given capture. Both executors (skill + agent) install one per run.
func NewSubmitTool(capture *SubmitCapture) llm.Tool {
return llm.DefineTool[submitArgs](
"submit",
"Submit your final answer or output to end this task. Call exactly once when you are done.",
func(_ context.Context, args submitArgs) (any, error) {
capture.Record(strings.TrimSpace(args.Output))
return "Final answer recorded. Do not call any more tools; reply now with a brief closing message.", nil
},
)
}