3 Commits

Author SHA1 Message Date
steve 97154395e6 C0b: document recordToolStart post-iteration timing (gadfly glm finding)
executus CI / test (pull_request) Failing after 59s
executus CI / test (push) Failing after 1m1s
majordomo's step observer fires post-iteration, so the critic's activity clock
refreshes per-iteration, not mid-tool — a single long tool call won't refresh it
until it returns. Documented + the host-progress-bridge mitigation (mort's
pattern). A true pre-dispatch hook needs majordomo support (follow-up).

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-27 10:10:56 -04:00
steve 4aa06f652e C0b: address verified gadfly findings (panic-safety + test honesty)
executus CI / test (pull_request) Failing after 58s
From PR #9 (minimax + deepseek):
- Run now has a top-level recover() — the "never propagates a panic" promise was
  unenforced; a panicking host Port (Critic/Audit/Palette) on the run goroutine
  now becomes Result.Err instead of unwinding into the caller.
- The critic deadline-watch goroutine recovers panics from a host Deadline()
  (it's a separate goroutine, so Run's recover can't catch it) — a buggy
  CriticHandle can't crash the process.
- CriticHandle interface documents its concurrency contract (Record*/Steer on the
  run goroutine vs Deadline()/Stop() from the watch goroutine — impls must be
  concurrent-safe; the critic battery already is).
- startCritic's dead `soft <= 0 -> noop` guard (withFallbacks already coerces to
  90s) replaced with a defensive inline 90s default, so a bypass of withFallbacks
  still gets a working critic instead of silently none.
- Delivery tests made honest: the old "error path" test only checked the
  early-return (no delivery); added TestDeliverErrorOnRunFailure (in-loop model
  error -> DeliverError to the target) + renamed the early-return test.

Graded all #9 findings in the gadfly MCP.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-27 10:09:22 -04:00
steve 43b2471737 C0b: wire Critic + Delivery into run.Executor
executus CI / test (pull_request) Failing after 1m0s
Adversarial Review (Gadfly) / review (pull_request) Successful in 5m9s
Continues finishing the executor's run.Ports wiring (after C0's Palette).

Critic (run/critic.go): when Ports.Critic is set and the agent enables it, the
executor calls Monitor at run start, feeds RecordStep/RecordToolStart from the
step observer, drains the critic's Steer messages into the loop via
agent.WithSteer, and binds the run's hard cancellation to the critic's
(extendable) Deadline through a watch goroutine — a healthy-but-slow run gets
room while a hung one is killed. Stop() on run end. Soft timeout from
Defaults.CriticSoftTimeout (default 90s). nil-safe: no critic / not-enabled =
no-op.

Delivery (run/executor.go deliver): after the run, when Ports.Delivery is set
and inv.DeliveryID is non-empty, the executor posts Result.Output (or
DeliverError on failure) to a host-interpreted deliver.Target
{inv.DeliveryKind, inv.DeliveryID}. Empty target = caller reads Result.Output
itself (the synchronous default; the `.agent run` canary). Best-effort +
detached.

tool.Invocation gains DeliveryKind/DeliveryID (host-set egress target).

Tests: critic monitored/fed/steered/stopped when enabled, untouched when not;
delivery posts on a target, skips without one. Deferred: Checkpointer (needs a
majordomo hook to snapshot the running message history).

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-27 10:00:05 -04:00
7 changed files with 371 additions and 17 deletions
+4 -3
View File
@@ -47,9 +47,10 @@ CORE (majordomo + stdlib):
toolbox + majordomo loop + compaction +
run-bounding (V10 detached timeout) + step/
audit observers + Budget gate; RunnableAgent
DTO + nil-safe run.Ports. Palette delegation
WIRED (skill__/agent__ tools, C0). Follow-ups:
wire Critic/Checkpointer/Delivery, Phases [C0b]
DTO + nil-safe run.Ports. Palette delegation +
Critic (monitor/deadline/steer) + Delivery
WIRED. Follow-ups: Checkpointer (needs a
majordomo msg-history hook), Phases [C0c]
dispatchguard/ loop/depth/fan-out caps [P0 ✓]
pendingattach/ attachment dedupe [P0 ✓]
tool/ registry + 3-stage permissions + ssrf [P1 ✓]
+98
View File
@@ -0,0 +1,98 @@
package run
import (
"context"
"time"
"gitea.stevedudenhoeffer.com/steve/majordomo/agent"
)
// criticDeadlineCheck is how often the deadline-watch goroutine polls the
// critic's hard deadline. Small relative to any realistic soft timeout.
const criticDeadlineCheck = time.Second
// criticBinding wires a CriticHandle into a run: the executor forwards activity
// (steps + tool starts) to it, binds the run's hard cancellation to the critic's
// extendable deadline, and exposes the critic's Steer messages as an agent
// RunOption. All methods are nil-safe so the executor can call them
// unconditionally when no critic is configured.
type criticBinding struct {
h CriticHandle
}
// startCritic begins critic monitoring for this run when one is configured and
// the agent enables it. It launches a goroutine that cancels runCtx (via cancel)
// the moment the critic's hard deadline passes — the critic may extend that
// deadline, so a healthy-but-slow run is given room while a hung one is killed.
// Returns (nil, no-op stop) when there is no critic. The caller MUST defer the
// returned stop.
func (e *Executor) startCritic(runCtx context.Context, cancel context.CancelFunc, ra RunnableAgent, info RunInfo) (*criticBinding, func()) {
noop := func() {}
if e.cfg.Ports.Critic == nil || !ra.Critic.Enabled {
return nil, noop
}
soft := e.cfg.Defaults.CriticSoftTimeout
if soft <= 0 {
soft = 90 * time.Second // defensive: withFallbacks normally guarantees >0
}
h := e.cfg.Ports.Critic.Monitor(runCtx, info, soft)
if h == nil {
return nil, noop
}
done := make(chan struct{})
go func() {
// A host CriticHandle.Deadline() that panics must not crash the process
// (this runs on its own goroutine, so the executor's top-level recover
// can't catch it). Log-free best-effort: just stop watching.
defer func() { _ = recover() }()
t := time.NewTicker(criticDeadlineCheck)
defer t.Stop()
for {
select {
case <-done:
return
case <-runCtx.Done():
return
case <-t.C:
// A zero deadline = no hard cap (not yet set); otherwise cancel
// once we're at or past it.
if d := h.Deadline(); !d.IsZero() && !time.Now().Before(d) {
cancel()
return
}
}
}
}()
return &criticBinding{h: h}, func() {
close(done)
h.Stop()
}
}
func (b *criticBinding) recordStep(iter int) {
if b != nil {
b.h.RecordStep(iter)
}
}
// recordToolStart forwards a tool call to the critic. NOTE: majordomo's step
// observer only fires AFTER an iteration completes, so this currently lands
// post-tool, not at dispatch — the activity clock is refreshed once per
// iteration, not mid-tool. A single very long tool call (e.g. a 30-min render)
// therefore won't refresh the clock until it returns; a host that runs such
// tools should feed interim progress to its Critic (mort's InstallProgressBridge
// pattern). A true pre-dispatch refresh needs a majordomo hook (follow-up).
func (b *criticBinding) recordToolStart(name, args string) {
if b != nil {
b.h.RecordToolStart(name, args)
}
}
// steerOptions returns the agent RunOptions that drain the critic's steer
// messages into the loop. Empty when there is no critic.
func (b *criticBinding) steerOptions() []agent.RunOption {
if b == nil {
return nil
}
return []agent.RunOption{agent.WithSteer(b.h.Steer)}
}
+88
View File
@@ -0,0 +1,88 @@
package run_test
import (
"context"
"sync"
"testing"
"time"
"gitea.stevedudenhoeffer.com/steve/majordomo/llm"
"gitea.stevedudenhoeffer.com/steve/majordomo/provider/fake"
"gitea.stevedudenhoeffer.com/steve/executus/run"
"gitea.stevedudenhoeffer.com/steve/executus/tool"
)
type fakeCritic struct{ h *fakeCriticHandle }
func (c *fakeCritic) Monitor(_ context.Context, _ run.RunInfo, _ time.Duration) run.CriticHandle {
return c.h
}
type fakeCriticHandle struct {
mu sync.Mutex
steps, tools, stops int
steered int
}
func (h *fakeCriticHandle) RecordStep(int) { h.mu.Lock(); h.steps++; h.mu.Unlock() }
func (h *fakeCriticHandle) RecordToolStart(string, string) {
h.mu.Lock()
h.tools++
h.mu.Unlock()
}
func (h *fakeCriticHandle) Steer() []llm.Message { h.mu.Lock(); h.steered++; h.mu.Unlock(); return nil }
func (h *fakeCriticHandle) Deadline() time.Time { return time.Time{} } // no hard deadline
func (h *fakeCriticHandle) Stop() { h.mu.Lock(); h.stops++; h.mu.Unlock() }
// TestCriticWired: an agent with Critic.Enabled gets monitored — Monitor returns
// a handle the executor feeds (RecordStep), drains (Steer), and stops.
func TestCriticWired(t *testing.T) {
h := &fakeCriticHandle{}
fp := fake.New("fake")
fp.Enqueue("m", fake.Reply("done"))
m, _ := fp.Model("m")
ex := run.New(run.Config{
Registry: tool.NewRegistry(),
Models: func(ctx context.Context, _ string) (context.Context, llm.Model, error) { return ctx, m, nil },
Ports: run.Ports{Critic: &fakeCritic{h: h}},
})
res := ex.Run(context.Background(),
run.RunnableAgent{Name: "watched", ModelTier: "m", Critic: run.CriticConfig{Enabled: true}},
tool.Invocation{RunID: "r"}, "go")
if res.Err != nil {
t.Fatalf("run error: %v", res.Err)
}
h.mu.Lock()
defer h.mu.Unlock()
if h.steps < 1 {
t.Errorf("critic should have seen >=1 step, got %d", h.steps)
}
if h.steered < 1 {
t.Errorf("critic Steer should be drained at least once, got %d", h.steered)
}
if h.stops != 1 {
t.Errorf("critic Stop should be called exactly once, got %d", h.stops)
}
}
// TestCriticDisabledNotMonitored: Critic.Enabled=false → Monitor never called.
func TestCriticDisabledNotMonitored(t *testing.T) {
h := &fakeCriticHandle{}
fp := fake.New("fake")
fp.Enqueue("m", fake.Reply("done"))
m, _ := fp.Model("m")
ex := run.New(run.Config{
Registry: tool.NewRegistry(),
Models: func(ctx context.Context, _ string) (context.Context, llm.Model, error) { return ctx, m, nil },
Ports: run.Ports{Critic: &fakeCritic{h: h}},
})
ex.Run(context.Background(),
run.RunnableAgent{Name: "x", ModelTier: "m"}, // Critic.Enabled=false
tool.Invocation{RunID: "r"}, "go")
h.mu.Lock()
defer h.mu.Unlock()
if h.stops != 0 || h.steps != 0 {
t.Errorf("disabled critic should not be monitored: steps=%d stops=%d", h.steps, h.stops)
}
}
+114
View File
@@ -0,0 +1,114 @@
package run_test
import (
"context"
"errors"
"testing"
"gitea.stevedudenhoeffer.com/steve/majordomo/llm"
"gitea.stevedudenhoeffer.com/steve/majordomo/provider/fake"
"gitea.stevedudenhoeffer.com/steve/executus/deliver"
"gitea.stevedudenhoeffer.com/steve/executus/run"
"gitea.stevedudenhoeffer.com/steve/executus/tool"
)
type recordingDelivery struct {
target deliver.Target
output string
errored error
delivers int
}
func (d *recordingDelivery) Deliver(_ context.Context, t deliver.Target, output string, _ []deliver.Artifact) (string, error) {
d.target, d.output, d.delivers = t, output, d.delivers+1
return "msg-1", nil
}
func (d *recordingDelivery) DeliverError(_ context.Context, t deliver.Target, e error) error {
d.target, d.errored = t, e
return nil
}
func TestDeliveryWired(t *testing.T) {
d := &recordingDelivery{}
fp := fake.New("fake")
fp.Enqueue("m", fake.Reply("the output"))
m, _ := fp.Model("m")
ex := run.New(run.Config{
Registry: tool.NewRegistry(),
Models: func(ctx context.Context, _ string) (context.Context, llm.Model, error) { return ctx, m, nil },
Ports: run.Ports{Delivery: d},
})
// With a delivery target, the executor posts the output.
ex.Run(context.Background(),
run.RunnableAgent{Name: "x", ModelTier: "m"},
tool.Invocation{RunID: "r", DeliveryKind: "channel", DeliveryID: "chan-9"}, "go")
if d.delivers != 1 || d.output != "the output" || d.target.ID != "chan-9" || d.target.Kind != "channel" {
t.Fatalf("delivery wrong: %+v out=%q", d.target, d.output)
}
}
func TestNoDeliveryWithoutTarget(t *testing.T) {
d := &recordingDelivery{}
fp := fake.New("fake")
fp.Enqueue("m", fake.Reply("x"))
m, _ := fp.Model("m")
ex := run.New(run.Config{
Registry: tool.NewRegistry(),
Models: func(ctx context.Context, _ string) (context.Context, llm.Model, error) { return ctx, m, nil },
Ports: run.Ports{Delivery: d},
})
// No DeliveryID → executor delivers nothing (caller reads Result.Output).
ex.Run(context.Background(),
run.RunnableAgent{Name: "x", ModelTier: "m"},
tool.Invocation{RunID: "r"}, "go")
if d.delivers != 0 {
t.Errorf("no target should mean no delivery, got %d", d.delivers)
}
}
// TestNoDeliveryOnEarlyResolveError: an error BEFORE the run starts (model
// resolve) returns before delivery is reached — neither Deliver nor DeliverError
// fires. (Delivery covers run OUTCOMES, not pre-run setup failures.)
func TestNoDeliveryOnEarlyResolveError(t *testing.T) {
d := &recordingDelivery{}
ex := run.New(run.Config{
Registry: tool.NewRegistry(),
Models: func(ctx context.Context, _ string) (context.Context, llm.Model, error) {
return ctx, nil, errors.New("resolve boom")
},
Ports: run.Ports{Delivery: d},
})
ex.Run(context.Background(),
run.RunnableAgent{Name: "x", ModelTier: "m"},
tool.Invocation{RunID: "r", DeliveryKind: "channel", DeliveryID: "chan-9"}, "go")
if d.delivers != 0 || d.errored != nil {
t.Errorf("early resolve failure should neither Deliver nor DeliverError: delivers=%d errored=%v", d.delivers, d.errored)
}
}
// TestDeliverErrorOnRunFailure: an in-loop run failure (the model errors) routes
// through DeliverError with the run error.
func TestDeliverErrorOnRunFailure(t *testing.T) {
d := &recordingDelivery{}
fp := fake.New("fake")
fp.Enqueue("m", fake.Step{Err: errors.New("model boom")}) // model errors mid-run
m, _ := fp.Model("m")
ex := run.New(run.Config{
Registry: tool.NewRegistry(),
Models: func(ctx context.Context, _ string) (context.Context, llm.Model, error) { return ctx, m, nil },
Ports: run.Ports{Delivery: d},
})
res := ex.Run(context.Background(),
run.RunnableAgent{Name: "x", ModelTier: "m"},
tool.Invocation{RunID: "r", DeliveryKind: "channel", DeliveryID: "chan-9"}, "go")
if res.Err == nil {
t.Fatal("expected a run error")
}
if d.delivers != 0 {
t.Errorf("a failed run should not Deliver (success path), got %d", d.delivers)
}
if d.errored == nil || d.target.ID != "chan-9" {
t.Errorf("a failed run with a target should DeliverError to chan-9, got errored=%v target=%+v", d.errored, d.target)
}
}
+55 -14
View File
@@ -10,6 +10,7 @@ import (
"gitea.stevedudenhoeffer.com/steve/majordomo/llm"
"gitea.stevedudenhoeffer.com/steve/executus/compact"
"gitea.stevedudenhoeffer.com/steve/executus/deliver"
"gitea.stevedudenhoeffer.com/steve/executus/tool"
)
@@ -27,6 +28,7 @@ type Defaults struct {
MaxConsecutiveToolErrors int // loop guard; default 3
MaxSameToolCallRepeats int // retry-storm guard; default 3
CompactionThresholdRatio float64 // fraction of model context to compact at; default 0.7
CriticSoftTimeout time.Duration // idle window before the critic wakes; default 90s
}
func (d Defaults) withFallbacks() Defaults {
@@ -48,6 +50,9 @@ func (d Defaults) withFallbacks() Defaults {
if d.CompactionThresholdRatio <= 0 {
d.CompactionThresholdRatio = 0.7
}
if d.CriticSoftTimeout <= 0 {
d.CriticSoftTimeout = 90 * time.Second
}
return d
}
@@ -99,10 +104,19 @@ type Result struct {
}
// Run executes ra with the given invocation + input and returns the Result. It
// never propagates a panic; failures surface in Result.Err.
func (e *Executor) Run(ctx context.Context, ra RunnableAgent, inv tool.Invocation, input string) Result {
// never propagates a panic; failures surface in Result.Err (a top-level recover
// converts any panic — including from a host Port — into a run error).
func (e *Executor) Run(ctx context.Context, ra RunnableAgent, inv tool.Invocation, input string) (res Result) {
started := time.Now()
res := Result{RunID: inv.RunID}
res = Result{RunID: inv.RunID}
// Enforce the no-panic contract: a panic anywhere in the run (incl. a host
// Critic/Audit/Palette callback on the main goroutine) becomes Result.Err
// rather than unwinding into the caller.
defer func() {
if r := recover(); r != nil {
res.Err = fmt.Errorf("run.Executor: recovered panic: %v", r)
}
}()
tier := ra.ModelTier
if tier == "" {
@@ -141,19 +155,20 @@ func (e *Executor) Run(ctx context.Context, ra RunnableAgent, inv tool.Invocatio
// Audit start (optional). The recorder satisfies RunTally; stamp it on the
// invocation so a self-status tool can read live progress.
info := RunInfo{
RunID: inv.RunID,
SubjectID: ra.ID,
Name: ra.Name,
CallerID: inv.CallerID,
ChannelID: inv.ChannelID,
ParentRunID: inv.ParentRunID,
Inputs: inv.SkillInputs,
StartedAt: started,
}
var rec RunRecorder
var stateAcc *RunStateAccessor
if e.cfg.Ports.Audit != nil {
rec = e.cfg.Ports.Audit.StartRun(ctx, RunInfo{
RunID: inv.RunID,
SubjectID: ra.ID,
Name: ra.Name,
CallerID: inv.CallerID,
ChannelID: inv.ChannelID,
ParentRunID: inv.ParentRunID,
Inputs: inv.SkillInputs,
StartedAt: started,
})
rec = e.cfg.Ports.Audit.StartRun(ctx, info)
}
if rec != nil {
stateAcc = NewRunStateAccessor(rec, maxIter, 0, started)
@@ -186,6 +201,12 @@ func (e *Executor) Run(ctx context.Context, ra RunnableAgent, inv tool.Invocatio
runCtx, mergeCancel := MergeCancellation(runCtx, ctx)
defer mergeCancel()
// Critic (optional): monitors the run for a stall, can nudge/extend/kill via
// its host Escalator. Its hard deadline is bound to runCtx (cancel on pass).
// nil-safe: no-op when no critic is configured or the agent doesn't enable it.
critic, stopCritic := e.startCritic(runCtx, cancel, ra, info)
defer stopCritic()
// Step instrumentation: accumulate Result.Steps + fire inv.OnStep, feed the
// audit recorder, and keep the live iteration counter fresh. majordomo's
// step observer hands us each completed iteration; we zip the model's tool
@@ -200,6 +221,7 @@ func (e *Executor) Run(ctx context.Context, ra RunnableAgent, inv tool.Invocatio
if rec != nil {
rec.OnStep(s.Index, s.Response)
}
critic.recordStep(s.Index) // keep the critic's activity clock fresh
var calls []llm.ToolCall
if s.Response != nil {
calls = s.Response.ToolCalls
@@ -210,6 +232,7 @@ func (e *Executor) Run(ctx context.Context, ra RunnableAgent, inv tool.Invocatio
}
for i := 0; i < n; i++ {
call, r := calls[i], s.Results[i]
critic.recordToolStart(call.Name, string(call.Arguments))
emitter.toolStart(runCtx, call.Name, call.Arguments)
emitter.toolEnd(runCtx, call, r.Content, r.IsError)
if rec != nil {
@@ -244,7 +267,7 @@ func (e *Executor) Run(ctx context.Context, ra RunnableAgent, inv tool.Invocatio
}
ag := agent.New(model, e.systemPrompt(ra), opts...)
runRes, runErr := ag.Run(runCtx, input)
runRes, runErr := ag.Run(runCtx, input, critic.steerOptions()...)
status := statusFor(runErr)
if runRes != nil {
@@ -258,6 +281,7 @@ func (e *Executor) Run(ctx context.Context, ra RunnableAgent, inv tool.Invocatio
if e.cfg.Ports.Budget != nil {
e.cfg.Ports.Budget.Commit(detach(ctx), inv.CallerID, time.Since(started).Seconds())
}
e.deliver(ctx, inv, res, runErr)
return res
}
@@ -316,6 +340,23 @@ func (e *Executor) compactionThreshold(tier string) int {
return int(float64(max) * e.cfg.Defaults.CompactionThresholdRatio)
}
// deliver posts the run's output (or error) via run.Ports.Delivery when both a
// Delivery and a target (inv.DeliveryID) are set. No target = the caller reads
// Result.Output itself (the synchronous default). Best-effort + detached: a
// delivery failure must not change the run's outcome.
func (e *Executor) deliver(ctx context.Context, inv tool.Invocation, res Result, runErr error) {
if e.cfg.Ports.Delivery == nil || inv.DeliveryID == "" {
return
}
target := deliver.Target{Kind: inv.DeliveryKind, ID: inv.DeliveryID}
dctx := detach(ctx)
if runErr != nil {
_ = e.cfg.Ports.Delivery.DeliverError(dctx, target, runErr)
return
}
_, _ = e.cfg.Ports.Delivery.Deliver(dctx, target, res.Output, nil)
}
// detach derives a bounded cleanup context off ctx, detached from its
// cancellation, for post-run writes. The cancel is intentionally not returned;
// CleanupContextTimeout bounds the lifetime.
+5
View File
@@ -113,6 +113,11 @@ type Critic interface {
}
// CriticHandle is the executor's live link to a run's critic.
//
// Concurrency: the executor calls RecordStep/RecordToolStart/Steer from the run
// goroutine while a separate watch goroutine polls Deadline() and the run's end
// calls Stop() — so implementations MUST be safe for concurrent use across these
// methods (the critic battery's handle guards its state with a mutex).
type CriticHandle interface {
// RecordStep / RecordToolStart keep the critic's activity clock fresh so a
// healthy-but-slow run is not mistaken for a hang.
+7
View File
@@ -173,6 +173,13 @@ type Invocation struct {
CallerID string
ChannelID string
GuildID string
// DeliveryKind / DeliveryID name where the executor posts the run's output
// via run.Ports.Delivery — a host-interpreted Target ("channel"/"dm"/
// "thread"/...). An empty DeliveryID means the executor delivers nothing
// and the caller reads Result.Output itself (the synchronous default; the
// `.agent run` canary works this way).
DeliveryKind string
DeliveryID string
// CallerIsAdmin is true when the caller is a mort admin (Member.Admin).
// Populated by the executor at run dispatch via Bot.GetMember; defaults
// to false on any lookup failure (member not found, DB error, empty