C0b: wire Critic + Delivery into run.Executor
executus CI / test (pull_request) Failing after 1m0s
Adversarial Review (Gadfly) / review (pull_request) Successful in 5m9s

Continues finishing the executor's run.Ports wiring (after C0's Palette).

Critic (run/critic.go): when Ports.Critic is set and the agent enables it, the
executor calls Monitor at run start, feeds RecordStep/RecordToolStart from the
step observer, drains the critic's Steer messages into the loop via
agent.WithSteer, and binds the run's hard cancellation to the critic's
(extendable) Deadline through a watch goroutine — a healthy-but-slow run gets
room while a hung one is killed. Stop() on run end. Soft timeout from
Defaults.CriticSoftTimeout (default 90s). nil-safe: no critic / not-enabled =
no-op.

Delivery (run/executor.go deliver): after the run, when Ports.Delivery is set
and inv.DeliveryID is non-empty, the executor posts Result.Output (or
DeliverError on failure) to a host-interpreted deliver.Target
{inv.DeliveryKind, inv.DeliveryID}. Empty target = caller reads Result.Output
itself (the synchronous default; the `.agent run` canary). Best-effort +
detached.

tool.Invocation gains DeliveryKind/DeliveryID (host-set egress target).

Tests: critic monitored/fed/steered/stopped when enabled, untouched when not;
delivery posts on a target, skips without one. Deferred: Checkpointer (needs a
majordomo hook to snapshot the running message history).

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
This commit is contained in:
2026-06-27 10:00:05 -04:00
parent 0c80679719
commit 43b2471737
6 changed files with 317 additions and 14 deletions
+4 -3
View File
@@ -47,9 +47,10 @@ CORE (majordomo + stdlib):
toolbox + majordomo loop + compaction + toolbox + majordomo loop + compaction +
run-bounding (V10 detached timeout) + step/ run-bounding (V10 detached timeout) + step/
audit observers + Budget gate; RunnableAgent audit observers + Budget gate; RunnableAgent
DTO + nil-safe run.Ports. Palette delegation DTO + nil-safe run.Ports. Palette delegation +
WIRED (skill__/agent__ tools, C0). Follow-ups: Critic (monitor/deadline/steer) + Delivery
wire Critic/Checkpointer/Delivery, Phases [C0b] WIRED. Follow-ups: Checkpointer (needs a
majordomo msg-history hook), Phases [C0c]
dispatchguard/ loop/depth/fan-out caps [P0 ✓] dispatchguard/ loop/depth/fan-out caps [P0 ✓]
pendingattach/ attachment dedupe [P0 ✓] pendingattach/ attachment dedupe [P0 ✓]
tool/ registry + 3-stage permissions + ssrf [P1 ✓] tool/ registry + 3-stage permissions + ssrf [P1 ✓]
+87
View File
@@ -0,0 +1,87 @@
package run
import (
"context"
"time"
"gitea.stevedudenhoeffer.com/steve/majordomo/agent"
)
// criticDeadlineCheck is how often the deadline-watch goroutine polls the
// critic's hard deadline. Small relative to any realistic soft timeout.
const criticDeadlineCheck = time.Second
// criticBinding wires a CriticHandle into a run: the executor forwards activity
// (steps + tool starts) to it, binds the run's hard cancellation to the critic's
// extendable deadline, and exposes the critic's Steer messages as an agent
// RunOption. All methods are nil-safe so the executor can call them
// unconditionally when no critic is configured.
type criticBinding struct {
h CriticHandle
}
// startCritic begins critic monitoring for this run when one is configured and
// the agent enables it. It launches a goroutine that cancels runCtx (via cancel)
// the moment the critic's hard deadline passes — the critic may extend that
// deadline, so a healthy-but-slow run is given room while a hung one is killed.
// Returns (nil, no-op stop) when there is no critic. The caller MUST defer the
// returned stop.
func (e *Executor) startCritic(runCtx context.Context, cancel context.CancelFunc, ra RunnableAgent, info RunInfo) (*criticBinding, func()) {
noop := func() {}
if e.cfg.Ports.Critic == nil || !ra.Critic.Enabled {
return nil, noop
}
soft := e.cfg.Defaults.CriticSoftTimeout
if soft <= 0 {
return nil, noop
}
h := e.cfg.Ports.Critic.Monitor(runCtx, info, soft)
if h == nil {
return nil, noop
}
done := make(chan struct{})
go func() {
t := time.NewTicker(criticDeadlineCheck)
defer t.Stop()
for {
select {
case <-done:
return
case <-runCtx.Done():
return
case <-t.C:
// A zero deadline = no hard cap (not yet set); otherwise cancel
// once we're at or past it.
if d := h.Deadline(); !d.IsZero() && !time.Now().Before(d) {
cancel()
return
}
}
}
}()
return &criticBinding{h: h}, func() {
close(done)
h.Stop()
}
}
func (b *criticBinding) recordStep(iter int) {
if b != nil {
b.h.RecordStep(iter)
}
}
func (b *criticBinding) recordToolStart(name, args string) {
if b != nil {
b.h.RecordToolStart(name, args)
}
}
// steerOptions returns the agent RunOptions that drain the critic's steer
// messages into the loop. Empty when there is no critic.
func (b *criticBinding) steerOptions() []agent.RunOption {
if b == nil {
return nil
}
return []agent.RunOption{agent.WithSteer(b.h.Steer)}
}
+88
View File
@@ -0,0 +1,88 @@
package run_test
import (
"context"
"sync"
"testing"
"time"
"gitea.stevedudenhoeffer.com/steve/majordomo/llm"
"gitea.stevedudenhoeffer.com/steve/majordomo/provider/fake"
"gitea.stevedudenhoeffer.com/steve/executus/run"
"gitea.stevedudenhoeffer.com/steve/executus/tool"
)
type fakeCritic struct{ h *fakeCriticHandle }
func (c *fakeCritic) Monitor(_ context.Context, _ run.RunInfo, _ time.Duration) run.CriticHandle {
return c.h
}
type fakeCriticHandle struct {
mu sync.Mutex
steps, tools, stops int
steered int
}
func (h *fakeCriticHandle) RecordStep(int) { h.mu.Lock(); h.steps++; h.mu.Unlock() }
func (h *fakeCriticHandle) RecordToolStart(string, string) {
h.mu.Lock()
h.tools++
h.mu.Unlock()
}
func (h *fakeCriticHandle) Steer() []llm.Message { h.mu.Lock(); h.steered++; h.mu.Unlock(); return nil }
func (h *fakeCriticHandle) Deadline() time.Time { return time.Time{} } // no hard deadline
func (h *fakeCriticHandle) Stop() { h.mu.Lock(); h.stops++; h.mu.Unlock() }
// TestCriticWired: an agent with Critic.Enabled gets monitored — Monitor returns
// a handle the executor feeds (RecordStep), drains (Steer), and stops.
func TestCriticWired(t *testing.T) {
h := &fakeCriticHandle{}
fp := fake.New("fake")
fp.Enqueue("m", fake.Reply("done"))
m, _ := fp.Model("m")
ex := run.New(run.Config{
Registry: tool.NewRegistry(),
Models: func(ctx context.Context, _ string) (context.Context, llm.Model, error) { return ctx, m, nil },
Ports: run.Ports{Critic: &fakeCritic{h: h}},
})
res := ex.Run(context.Background(),
run.RunnableAgent{Name: "watched", ModelTier: "m", Critic: run.CriticConfig{Enabled: true}},
tool.Invocation{RunID: "r"}, "go")
if res.Err != nil {
t.Fatalf("run error: %v", res.Err)
}
h.mu.Lock()
defer h.mu.Unlock()
if h.steps < 1 {
t.Errorf("critic should have seen >=1 step, got %d", h.steps)
}
if h.steered < 1 {
t.Errorf("critic Steer should be drained at least once, got %d", h.steered)
}
if h.stops != 1 {
t.Errorf("critic Stop should be called exactly once, got %d", h.stops)
}
}
// TestCriticDisabledNotMonitored: Critic.Enabled=false → Monitor never called.
func TestCriticDisabledNotMonitored(t *testing.T) {
h := &fakeCriticHandle{}
fp := fake.New("fake")
fp.Enqueue("m", fake.Reply("done"))
m, _ := fp.Model("m")
ex := run.New(run.Config{
Registry: tool.NewRegistry(),
Models: func(ctx context.Context, _ string) (context.Context, llm.Model, error) { return ctx, m, nil },
Ports: run.Ports{Critic: &fakeCritic{h: h}},
})
ex.Run(context.Background(),
run.RunnableAgent{Name: "x", ModelTier: "m"}, // Critic.Enabled=false
tool.Invocation{RunID: "r"}, "go")
h.mu.Lock()
defer h.mu.Unlock()
if h.stops != 0 || h.steps != 0 {
t.Errorf("disabled critic should not be monitored: steps=%d stops=%d", h.steps, h.stops)
}
}
+88
View File
@@ -0,0 +1,88 @@
package run_test
import (
"context"
"errors"
"testing"
"gitea.stevedudenhoeffer.com/steve/majordomo/llm"
"gitea.stevedudenhoeffer.com/steve/majordomo/provider/fake"
"gitea.stevedudenhoeffer.com/steve/executus/deliver"
"gitea.stevedudenhoeffer.com/steve/executus/run"
"gitea.stevedudenhoeffer.com/steve/executus/tool"
)
type recordingDelivery struct {
target deliver.Target
output string
errored error
delivers int
}
func (d *recordingDelivery) Deliver(_ context.Context, t deliver.Target, output string, _ []deliver.Artifact) (string, error) {
d.target, d.output, d.delivers = t, output, d.delivers+1
return "msg-1", nil
}
func (d *recordingDelivery) DeliverError(_ context.Context, t deliver.Target, e error) error {
d.target, d.errored = t, e
return nil
}
func TestDeliveryWired(t *testing.T) {
d := &recordingDelivery{}
fp := fake.New("fake")
fp.Enqueue("m", fake.Reply("the output"))
m, _ := fp.Model("m")
ex := run.New(run.Config{
Registry: tool.NewRegistry(),
Models: func(ctx context.Context, _ string) (context.Context, llm.Model, error) { return ctx, m, nil },
Ports: run.Ports{Delivery: d},
})
// With a delivery target, the executor posts the output.
ex.Run(context.Background(),
run.RunnableAgent{Name: "x", ModelTier: "m"},
tool.Invocation{RunID: "r", DeliveryKind: "channel", DeliveryID: "chan-9"}, "go")
if d.delivers != 1 || d.output != "the output" || d.target.ID != "chan-9" || d.target.Kind != "channel" {
t.Fatalf("delivery wrong: %+v out=%q", d.target, d.output)
}
}
func TestNoDeliveryWithoutTarget(t *testing.T) {
d := &recordingDelivery{}
fp := fake.New("fake")
fp.Enqueue("m", fake.Reply("x"))
m, _ := fp.Model("m")
ex := run.New(run.Config{
Registry: tool.NewRegistry(),
Models: func(ctx context.Context, _ string) (context.Context, llm.Model, error) { return ctx, m, nil },
Ports: run.Ports{Delivery: d},
})
// No DeliveryID → executor delivers nothing (caller reads Result.Output).
ex.Run(context.Background(),
run.RunnableAgent{Name: "x", ModelTier: "m"},
tool.Invocation{RunID: "r"}, "go")
if d.delivers != 0 {
t.Errorf("no target should mean no delivery, got %d", d.delivers)
}
}
func TestDeliveryErrorPath(t *testing.T) {
d := &recordingDelivery{}
ex := run.New(run.Config{
Registry: tool.NewRegistry(),
Models: func(ctx context.Context, _ string) (context.Context, llm.Model, error) {
return ctx, nil, errors.New("resolve boom") // forces a run error
},
Ports: run.Ports{Delivery: d},
})
ex.Run(context.Background(),
run.RunnableAgent{Name: "x", ModelTier: "m"},
tool.Invocation{RunID: "r", DeliveryKind: "channel", DeliveryID: "chan-9"}, "go")
// A model-resolve error returns before the run context exists, so delivery
// isn't reached — assert no spurious Deliver. (DeliverError on in-loop errors
// is exercised by the wiring; this guards the early-return path.)
if d.delivers != 0 {
t.Errorf("early failure should not Deliver, got %d", d.delivers)
}
}
+38 -6
View File
@@ -10,6 +10,7 @@ import (
"gitea.stevedudenhoeffer.com/steve/majordomo/llm" "gitea.stevedudenhoeffer.com/steve/majordomo/llm"
"gitea.stevedudenhoeffer.com/steve/executus/compact" "gitea.stevedudenhoeffer.com/steve/executus/compact"
"gitea.stevedudenhoeffer.com/steve/executus/deliver"
"gitea.stevedudenhoeffer.com/steve/executus/tool" "gitea.stevedudenhoeffer.com/steve/executus/tool"
) )
@@ -27,6 +28,7 @@ type Defaults struct {
MaxConsecutiveToolErrors int // loop guard; default 3 MaxConsecutiveToolErrors int // loop guard; default 3
MaxSameToolCallRepeats int // retry-storm guard; default 3 MaxSameToolCallRepeats int // retry-storm guard; default 3
CompactionThresholdRatio float64 // fraction of model context to compact at; default 0.7 CompactionThresholdRatio float64 // fraction of model context to compact at; default 0.7
CriticSoftTimeout time.Duration // idle window before the critic wakes; default 90s
} }
func (d Defaults) withFallbacks() Defaults { func (d Defaults) withFallbacks() Defaults {
@@ -48,6 +50,9 @@ func (d Defaults) withFallbacks() Defaults {
if d.CompactionThresholdRatio <= 0 { if d.CompactionThresholdRatio <= 0 {
d.CompactionThresholdRatio = 0.7 d.CompactionThresholdRatio = 0.7
} }
if d.CriticSoftTimeout <= 0 {
d.CriticSoftTimeout = 90 * time.Second
}
return d return d
} }
@@ -141,10 +146,7 @@ func (e *Executor) Run(ctx context.Context, ra RunnableAgent, inv tool.Invocatio
// Audit start (optional). The recorder satisfies RunTally; stamp it on the // Audit start (optional). The recorder satisfies RunTally; stamp it on the
// invocation so a self-status tool can read live progress. // invocation so a self-status tool can read live progress.
var rec RunRecorder info := RunInfo{
var stateAcc *RunStateAccessor
if e.cfg.Ports.Audit != nil {
rec = e.cfg.Ports.Audit.StartRun(ctx, RunInfo{
RunID: inv.RunID, RunID: inv.RunID,
SubjectID: ra.ID, SubjectID: ra.ID,
Name: ra.Name, Name: ra.Name,
@@ -153,7 +155,11 @@ func (e *Executor) Run(ctx context.Context, ra RunnableAgent, inv tool.Invocatio
ParentRunID: inv.ParentRunID, ParentRunID: inv.ParentRunID,
Inputs: inv.SkillInputs, Inputs: inv.SkillInputs,
StartedAt: started, StartedAt: started,
}) }
var rec RunRecorder
var stateAcc *RunStateAccessor
if e.cfg.Ports.Audit != nil {
rec = e.cfg.Ports.Audit.StartRun(ctx, info)
} }
if rec != nil { if rec != nil {
stateAcc = NewRunStateAccessor(rec, maxIter, 0, started) stateAcc = NewRunStateAccessor(rec, maxIter, 0, started)
@@ -186,6 +192,12 @@ func (e *Executor) Run(ctx context.Context, ra RunnableAgent, inv tool.Invocatio
runCtx, mergeCancel := MergeCancellation(runCtx, ctx) runCtx, mergeCancel := MergeCancellation(runCtx, ctx)
defer mergeCancel() defer mergeCancel()
// Critic (optional): monitors the run for a stall, can nudge/extend/kill via
// its host Escalator. Its hard deadline is bound to runCtx (cancel on pass).
// nil-safe: no-op when no critic is configured or the agent doesn't enable it.
critic, stopCritic := e.startCritic(runCtx, cancel, ra, info)
defer stopCritic()
// Step instrumentation: accumulate Result.Steps + fire inv.OnStep, feed the // Step instrumentation: accumulate Result.Steps + fire inv.OnStep, feed the
// audit recorder, and keep the live iteration counter fresh. majordomo's // audit recorder, and keep the live iteration counter fresh. majordomo's
// step observer hands us each completed iteration; we zip the model's tool // step observer hands us each completed iteration; we zip the model's tool
@@ -200,6 +212,7 @@ func (e *Executor) Run(ctx context.Context, ra RunnableAgent, inv tool.Invocatio
if rec != nil { if rec != nil {
rec.OnStep(s.Index, s.Response) rec.OnStep(s.Index, s.Response)
} }
critic.recordStep(s.Index) // keep the critic's activity clock fresh
var calls []llm.ToolCall var calls []llm.ToolCall
if s.Response != nil { if s.Response != nil {
calls = s.Response.ToolCalls calls = s.Response.ToolCalls
@@ -210,6 +223,7 @@ func (e *Executor) Run(ctx context.Context, ra RunnableAgent, inv tool.Invocatio
} }
for i := 0; i < n; i++ { for i := 0; i < n; i++ {
call, r := calls[i], s.Results[i] call, r := calls[i], s.Results[i]
critic.recordToolStart(call.Name, string(call.Arguments))
emitter.toolStart(runCtx, call.Name, call.Arguments) emitter.toolStart(runCtx, call.Name, call.Arguments)
emitter.toolEnd(runCtx, call, r.Content, r.IsError) emitter.toolEnd(runCtx, call, r.Content, r.IsError)
if rec != nil { if rec != nil {
@@ -244,7 +258,7 @@ func (e *Executor) Run(ctx context.Context, ra RunnableAgent, inv tool.Invocatio
} }
ag := agent.New(model, e.systemPrompt(ra), opts...) ag := agent.New(model, e.systemPrompt(ra), opts...)
runRes, runErr := ag.Run(runCtx, input) runRes, runErr := ag.Run(runCtx, input, critic.steerOptions()...)
status := statusFor(runErr) status := statusFor(runErr)
if runRes != nil { if runRes != nil {
@@ -258,6 +272,7 @@ func (e *Executor) Run(ctx context.Context, ra RunnableAgent, inv tool.Invocatio
if e.cfg.Ports.Budget != nil { if e.cfg.Ports.Budget != nil {
e.cfg.Ports.Budget.Commit(detach(ctx), inv.CallerID, time.Since(started).Seconds()) e.cfg.Ports.Budget.Commit(detach(ctx), inv.CallerID, time.Since(started).Seconds())
} }
e.deliver(ctx, inv, res, runErr)
return res return res
} }
@@ -316,6 +331,23 @@ func (e *Executor) compactionThreshold(tier string) int {
return int(float64(max) * e.cfg.Defaults.CompactionThresholdRatio) return int(float64(max) * e.cfg.Defaults.CompactionThresholdRatio)
} }
// deliver posts the run's output (or error) via run.Ports.Delivery when both a
// Delivery and a target (inv.DeliveryID) are set. No target = the caller reads
// Result.Output itself (the synchronous default). Best-effort + detached: a
// delivery failure must not change the run's outcome.
func (e *Executor) deliver(ctx context.Context, inv tool.Invocation, res Result, runErr error) {
if e.cfg.Ports.Delivery == nil || inv.DeliveryID == "" {
return
}
target := deliver.Target{Kind: inv.DeliveryKind, ID: inv.DeliveryID}
dctx := detach(ctx)
if runErr != nil {
_ = e.cfg.Ports.Delivery.DeliverError(dctx, target, runErr)
return
}
_, _ = e.cfg.Ports.Delivery.Deliver(dctx, target, res.Output, nil)
}
// detach derives a bounded cleanup context off ctx, detached from its // detach derives a bounded cleanup context off ctx, detached from its
// cancellation, for post-run writes. The cancel is intentionally not returned; // cancellation, for post-run writes. The cancel is intentionally not returned;
// CleanupContextTimeout bounds the lifetime. // CleanupContextTimeout bounds the lifetime.
+7
View File
@@ -173,6 +173,13 @@ type Invocation struct {
CallerID string CallerID string
ChannelID string ChannelID string
GuildID string GuildID string
// DeliveryKind / DeliveryID name where the executor posts the run's output
// via run.Ports.Delivery — a host-interpreted Target ("channel"/"dm"/
// "thread"/...). An empty DeliveryID means the executor delivers nothing
// and the caller reads Result.Output itself (the synchronous default; the
// `.agent run` canary works this way).
DeliveryKind string
DeliveryID string
// CallerIsAdmin is true when the caller is a mort admin (Member.Admin). // CallerIsAdmin is true when the caller is a mort admin (Member.Admin).
// Populated by the executor at run dispatch via Bot.GetMember; defaults // Populated by the executor at run dispatch via Bot.GetMember; defaults
// to false on any lookup failure (member not found, DB error, empty // to false on any lookup failure (member not found, DB error, empty