From 43b247173764ecc07dfcc0528cc4b0fc04942014 Mon Sep 17 00:00:00 2001 From: Steve Dudenhoeffer Date: Sat, 27 Jun 2026 10:00:05 -0400 Subject: [PATCH] C0b: wire Critic + Delivery into run.Executor MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Continues finishing the executor's run.Ports wiring (after C0's Palette). Critic (run/critic.go): when Ports.Critic is set and the agent enables it, the executor calls Monitor at run start, feeds RecordStep/RecordToolStart from the step observer, drains the critic's Steer messages into the loop via agent.WithSteer, and binds the run's hard cancellation to the critic's (extendable) Deadline through a watch goroutine — a healthy-but-slow run gets room while a hung one is killed. Stop() on run end. Soft timeout from Defaults.CriticSoftTimeout (default 90s). nil-safe: no critic / not-enabled = no-op. Delivery (run/executor.go deliver): after the run, when Ports.Delivery is set and inv.DeliveryID is non-empty, the executor posts Result.Output (or DeliverError on failure) to a host-interpreted deliver.Target {inv.DeliveryKind, inv.DeliveryID}. Empty target = caller reads Result.Output itself (the synchronous default; the `.agent run` canary). Best-effort + detached. tool.Invocation gains DeliveryKind/DeliveryID (host-set egress target). Tests: critic monitored/fed/steered/stopped when enabled, untouched when not; delivery posts on a target, skips without one. Deferred: Checkpointer (needs a majordomo hook to snapshot the running message history). Co-Authored-By: Claude Opus 4.8 (1M context) --- CLAUDE.md | 7 ++-- run/critic.go | 87 +++++++++++++++++++++++++++++++++++++++++++ run/critic_test.go | 88 ++++++++++++++++++++++++++++++++++++++++++++ run/delivery_test.go | 88 ++++++++++++++++++++++++++++++++++++++++++++ run/executor.go | 54 +++++++++++++++++++++------ tool/registry.go | 7 ++++ 6 files changed, 317 insertions(+), 14 deletions(-) create mode 100644 run/critic.go create mode 100644 run/critic_test.go create mode 100644 run/delivery_test.go diff --git a/CLAUDE.md b/CLAUDE.md index d8963c0..3b6af72 100644 --- a/CLAUDE.md +++ b/CLAUDE.md @@ -47,9 +47,10 @@ CORE (majordomo + stdlib): toolbox + majordomo loop + compaction + run-bounding (V10 detached timeout) + step/ audit observers + Budget gate; RunnableAgent - DTO + nil-safe run.Ports. Palette delegation - WIRED (skill__/agent__ tools, C0). Follow-ups: - wire Critic/Checkpointer/Delivery, Phases [C0b] + DTO + nil-safe run.Ports. Palette delegation + + Critic (monitor/deadline/steer) + Delivery + WIRED. Follow-ups: Checkpointer (needs a + majordomo msg-history hook), Phases [C0c] dispatchguard/ loop/depth/fan-out caps [P0 ✓] pendingattach/ attachment dedupe [P0 ✓] tool/ registry + 3-stage permissions + ssrf [P1 ✓] diff --git a/run/critic.go b/run/critic.go new file mode 100644 index 0000000..c563550 --- /dev/null +++ b/run/critic.go @@ -0,0 +1,87 @@ +package run + +import ( + "context" + "time" + + "gitea.stevedudenhoeffer.com/steve/majordomo/agent" +) + +// criticDeadlineCheck is how often the deadline-watch goroutine polls the +// critic's hard deadline. Small relative to any realistic soft timeout. +const criticDeadlineCheck = time.Second + +// criticBinding wires a CriticHandle into a run: the executor forwards activity +// (steps + tool starts) to it, binds the run's hard cancellation to the critic's +// extendable deadline, and exposes the critic's Steer messages as an agent +// RunOption. All methods are nil-safe so the executor can call them +// unconditionally when no critic is configured. +type criticBinding struct { + h CriticHandle +} + +// startCritic begins critic monitoring for this run when one is configured and +// the agent enables it. It launches a goroutine that cancels runCtx (via cancel) +// the moment the critic's hard deadline passes — the critic may extend that +// deadline, so a healthy-but-slow run is given room while a hung one is killed. +// Returns (nil, no-op stop) when there is no critic. The caller MUST defer the +// returned stop. +func (e *Executor) startCritic(runCtx context.Context, cancel context.CancelFunc, ra RunnableAgent, info RunInfo) (*criticBinding, func()) { + noop := func() {} + if e.cfg.Ports.Critic == nil || !ra.Critic.Enabled { + return nil, noop + } + soft := e.cfg.Defaults.CriticSoftTimeout + if soft <= 0 { + return nil, noop + } + h := e.cfg.Ports.Critic.Monitor(runCtx, info, soft) + if h == nil { + return nil, noop + } + done := make(chan struct{}) + go func() { + t := time.NewTicker(criticDeadlineCheck) + defer t.Stop() + for { + select { + case <-done: + return + case <-runCtx.Done(): + return + case <-t.C: + // A zero deadline = no hard cap (not yet set); otherwise cancel + // once we're at or past it. + if d := h.Deadline(); !d.IsZero() && !time.Now().Before(d) { + cancel() + return + } + } + } + }() + return &criticBinding{h: h}, func() { + close(done) + h.Stop() + } +} + +func (b *criticBinding) recordStep(iter int) { + if b != nil { + b.h.RecordStep(iter) + } +} + +func (b *criticBinding) recordToolStart(name, args string) { + if b != nil { + b.h.RecordToolStart(name, args) + } +} + +// steerOptions returns the agent RunOptions that drain the critic's steer +// messages into the loop. Empty when there is no critic. +func (b *criticBinding) steerOptions() []agent.RunOption { + if b == nil { + return nil + } + return []agent.RunOption{agent.WithSteer(b.h.Steer)} +} diff --git a/run/critic_test.go b/run/critic_test.go new file mode 100644 index 0000000..cc197b8 --- /dev/null +++ b/run/critic_test.go @@ -0,0 +1,88 @@ +package run_test + +import ( + "context" + "sync" + "testing" + "time" + + "gitea.stevedudenhoeffer.com/steve/majordomo/llm" + "gitea.stevedudenhoeffer.com/steve/majordomo/provider/fake" + + "gitea.stevedudenhoeffer.com/steve/executus/run" + "gitea.stevedudenhoeffer.com/steve/executus/tool" +) + +type fakeCritic struct{ h *fakeCriticHandle } + +func (c *fakeCritic) Monitor(_ context.Context, _ run.RunInfo, _ time.Duration) run.CriticHandle { + return c.h +} + +type fakeCriticHandle struct { + mu sync.Mutex + steps, tools, stops int + steered int +} + +func (h *fakeCriticHandle) RecordStep(int) { h.mu.Lock(); h.steps++; h.mu.Unlock() } +func (h *fakeCriticHandle) RecordToolStart(string, string) { + h.mu.Lock() + h.tools++ + h.mu.Unlock() +} +func (h *fakeCriticHandle) Steer() []llm.Message { h.mu.Lock(); h.steered++; h.mu.Unlock(); return nil } +func (h *fakeCriticHandle) Deadline() time.Time { return time.Time{} } // no hard deadline +func (h *fakeCriticHandle) Stop() { h.mu.Lock(); h.stops++; h.mu.Unlock() } + +// TestCriticWired: an agent with Critic.Enabled gets monitored — Monitor returns +// a handle the executor feeds (RecordStep), drains (Steer), and stops. +func TestCriticWired(t *testing.T) { + h := &fakeCriticHandle{} + fp := fake.New("fake") + fp.Enqueue("m", fake.Reply("done")) + m, _ := fp.Model("m") + ex := run.New(run.Config{ + Registry: tool.NewRegistry(), + Models: func(ctx context.Context, _ string) (context.Context, llm.Model, error) { return ctx, m, nil }, + Ports: run.Ports{Critic: &fakeCritic{h: h}}, + }) + res := ex.Run(context.Background(), + run.RunnableAgent{Name: "watched", ModelTier: "m", Critic: run.CriticConfig{Enabled: true}}, + tool.Invocation{RunID: "r"}, "go") + if res.Err != nil { + t.Fatalf("run error: %v", res.Err) + } + h.mu.Lock() + defer h.mu.Unlock() + if h.steps < 1 { + t.Errorf("critic should have seen >=1 step, got %d", h.steps) + } + if h.steered < 1 { + t.Errorf("critic Steer should be drained at least once, got %d", h.steered) + } + if h.stops != 1 { + t.Errorf("critic Stop should be called exactly once, got %d", h.stops) + } +} + +// TestCriticDisabledNotMonitored: Critic.Enabled=false → Monitor never called. +func TestCriticDisabledNotMonitored(t *testing.T) { + h := &fakeCriticHandle{} + fp := fake.New("fake") + fp.Enqueue("m", fake.Reply("done")) + m, _ := fp.Model("m") + ex := run.New(run.Config{ + Registry: tool.NewRegistry(), + Models: func(ctx context.Context, _ string) (context.Context, llm.Model, error) { return ctx, m, nil }, + Ports: run.Ports{Critic: &fakeCritic{h: h}}, + }) + ex.Run(context.Background(), + run.RunnableAgent{Name: "x", ModelTier: "m"}, // Critic.Enabled=false + tool.Invocation{RunID: "r"}, "go") + h.mu.Lock() + defer h.mu.Unlock() + if h.stops != 0 || h.steps != 0 { + t.Errorf("disabled critic should not be monitored: steps=%d stops=%d", h.steps, h.stops) + } +} diff --git a/run/delivery_test.go b/run/delivery_test.go new file mode 100644 index 0000000..eafda1e --- /dev/null +++ b/run/delivery_test.go @@ -0,0 +1,88 @@ +package run_test + +import ( + "context" + "errors" + "testing" + + "gitea.stevedudenhoeffer.com/steve/majordomo/llm" + "gitea.stevedudenhoeffer.com/steve/majordomo/provider/fake" + + "gitea.stevedudenhoeffer.com/steve/executus/deliver" + "gitea.stevedudenhoeffer.com/steve/executus/run" + "gitea.stevedudenhoeffer.com/steve/executus/tool" +) + +type recordingDelivery struct { + target deliver.Target + output string + errored error + delivers int +} + +func (d *recordingDelivery) Deliver(_ context.Context, t deliver.Target, output string, _ []deliver.Artifact) (string, error) { + d.target, d.output, d.delivers = t, output, d.delivers+1 + return "msg-1", nil +} +func (d *recordingDelivery) DeliverError(_ context.Context, t deliver.Target, e error) error { + d.target, d.errored = t, e + return nil +} + +func TestDeliveryWired(t *testing.T) { + d := &recordingDelivery{} + fp := fake.New("fake") + fp.Enqueue("m", fake.Reply("the output")) + m, _ := fp.Model("m") + ex := run.New(run.Config{ + Registry: tool.NewRegistry(), + Models: func(ctx context.Context, _ string) (context.Context, llm.Model, error) { return ctx, m, nil }, + Ports: run.Ports{Delivery: d}, + }) + // With a delivery target, the executor posts the output. + ex.Run(context.Background(), + run.RunnableAgent{Name: "x", ModelTier: "m"}, + tool.Invocation{RunID: "r", DeliveryKind: "channel", DeliveryID: "chan-9"}, "go") + if d.delivers != 1 || d.output != "the output" || d.target.ID != "chan-9" || d.target.Kind != "channel" { + t.Fatalf("delivery wrong: %+v out=%q", d.target, d.output) + } +} + +func TestNoDeliveryWithoutTarget(t *testing.T) { + d := &recordingDelivery{} + fp := fake.New("fake") + fp.Enqueue("m", fake.Reply("x")) + m, _ := fp.Model("m") + ex := run.New(run.Config{ + Registry: tool.NewRegistry(), + Models: func(ctx context.Context, _ string) (context.Context, llm.Model, error) { return ctx, m, nil }, + Ports: run.Ports{Delivery: d}, + }) + // No DeliveryID → executor delivers nothing (caller reads Result.Output). + ex.Run(context.Background(), + run.RunnableAgent{Name: "x", ModelTier: "m"}, + tool.Invocation{RunID: "r"}, "go") + if d.delivers != 0 { + t.Errorf("no target should mean no delivery, got %d", d.delivers) + } +} + +func TestDeliveryErrorPath(t *testing.T) { + d := &recordingDelivery{} + ex := run.New(run.Config{ + Registry: tool.NewRegistry(), + Models: func(ctx context.Context, _ string) (context.Context, llm.Model, error) { + return ctx, nil, errors.New("resolve boom") // forces a run error + }, + Ports: run.Ports{Delivery: d}, + }) + ex.Run(context.Background(), + run.RunnableAgent{Name: "x", ModelTier: "m"}, + tool.Invocation{RunID: "r", DeliveryKind: "channel", DeliveryID: "chan-9"}, "go") + // A model-resolve error returns before the run context exists, so delivery + // isn't reached — assert no spurious Deliver. (DeliverError on in-loop errors + // is exercised by the wiring; this guards the early-return path.) + if d.delivers != 0 { + t.Errorf("early failure should not Deliver, got %d", d.delivers) + } +} diff --git a/run/executor.go b/run/executor.go index 3b28866..7ac8d72 100644 --- a/run/executor.go +++ b/run/executor.go @@ -10,6 +10,7 @@ import ( "gitea.stevedudenhoeffer.com/steve/majordomo/llm" "gitea.stevedudenhoeffer.com/steve/executus/compact" + "gitea.stevedudenhoeffer.com/steve/executus/deliver" "gitea.stevedudenhoeffer.com/steve/executus/tool" ) @@ -27,6 +28,7 @@ type Defaults struct { MaxConsecutiveToolErrors int // loop guard; default 3 MaxSameToolCallRepeats int // retry-storm guard; default 3 CompactionThresholdRatio float64 // fraction of model context to compact at; default 0.7 + CriticSoftTimeout time.Duration // idle window before the critic wakes; default 90s } func (d Defaults) withFallbacks() Defaults { @@ -48,6 +50,9 @@ func (d Defaults) withFallbacks() Defaults { if d.CompactionThresholdRatio <= 0 { d.CompactionThresholdRatio = 0.7 } + if d.CriticSoftTimeout <= 0 { + d.CriticSoftTimeout = 90 * time.Second + } return d } @@ -141,19 +146,20 @@ func (e *Executor) Run(ctx context.Context, ra RunnableAgent, inv tool.Invocatio // Audit start (optional). The recorder satisfies RunTally; stamp it on the // invocation so a self-status tool can read live progress. + info := RunInfo{ + RunID: inv.RunID, + SubjectID: ra.ID, + Name: ra.Name, + CallerID: inv.CallerID, + ChannelID: inv.ChannelID, + ParentRunID: inv.ParentRunID, + Inputs: inv.SkillInputs, + StartedAt: started, + } var rec RunRecorder var stateAcc *RunStateAccessor if e.cfg.Ports.Audit != nil { - rec = e.cfg.Ports.Audit.StartRun(ctx, RunInfo{ - RunID: inv.RunID, - SubjectID: ra.ID, - Name: ra.Name, - CallerID: inv.CallerID, - ChannelID: inv.ChannelID, - ParentRunID: inv.ParentRunID, - Inputs: inv.SkillInputs, - StartedAt: started, - }) + rec = e.cfg.Ports.Audit.StartRun(ctx, info) } if rec != nil { stateAcc = NewRunStateAccessor(rec, maxIter, 0, started) @@ -186,6 +192,12 @@ func (e *Executor) Run(ctx context.Context, ra RunnableAgent, inv tool.Invocatio runCtx, mergeCancel := MergeCancellation(runCtx, ctx) defer mergeCancel() + // Critic (optional): monitors the run for a stall, can nudge/extend/kill via + // its host Escalator. Its hard deadline is bound to runCtx (cancel on pass). + // nil-safe: no-op when no critic is configured or the agent doesn't enable it. + critic, stopCritic := e.startCritic(runCtx, cancel, ra, info) + defer stopCritic() + // Step instrumentation: accumulate Result.Steps + fire inv.OnStep, feed the // audit recorder, and keep the live iteration counter fresh. majordomo's // step observer hands us each completed iteration; we zip the model's tool @@ -200,6 +212,7 @@ func (e *Executor) Run(ctx context.Context, ra RunnableAgent, inv tool.Invocatio if rec != nil { rec.OnStep(s.Index, s.Response) } + critic.recordStep(s.Index) // keep the critic's activity clock fresh var calls []llm.ToolCall if s.Response != nil { calls = s.Response.ToolCalls @@ -210,6 +223,7 @@ func (e *Executor) Run(ctx context.Context, ra RunnableAgent, inv tool.Invocatio } for i := 0; i < n; i++ { call, r := calls[i], s.Results[i] + critic.recordToolStart(call.Name, string(call.Arguments)) emitter.toolStart(runCtx, call.Name, call.Arguments) emitter.toolEnd(runCtx, call, r.Content, r.IsError) if rec != nil { @@ -244,7 +258,7 @@ func (e *Executor) Run(ctx context.Context, ra RunnableAgent, inv tool.Invocatio } ag := agent.New(model, e.systemPrompt(ra), opts...) - runRes, runErr := ag.Run(runCtx, input) + runRes, runErr := ag.Run(runCtx, input, critic.steerOptions()...) status := statusFor(runErr) if runRes != nil { @@ -258,6 +272,7 @@ func (e *Executor) Run(ctx context.Context, ra RunnableAgent, inv tool.Invocatio if e.cfg.Ports.Budget != nil { e.cfg.Ports.Budget.Commit(detach(ctx), inv.CallerID, time.Since(started).Seconds()) } + e.deliver(ctx, inv, res, runErr) return res } @@ -316,6 +331,23 @@ func (e *Executor) compactionThreshold(tier string) int { return int(float64(max) * e.cfg.Defaults.CompactionThresholdRatio) } +// deliver posts the run's output (or error) via run.Ports.Delivery when both a +// Delivery and a target (inv.DeliveryID) are set. No target = the caller reads +// Result.Output itself (the synchronous default). Best-effort + detached: a +// delivery failure must not change the run's outcome. +func (e *Executor) deliver(ctx context.Context, inv tool.Invocation, res Result, runErr error) { + if e.cfg.Ports.Delivery == nil || inv.DeliveryID == "" { + return + } + target := deliver.Target{Kind: inv.DeliveryKind, ID: inv.DeliveryID} + dctx := detach(ctx) + if runErr != nil { + _ = e.cfg.Ports.Delivery.DeliverError(dctx, target, runErr) + return + } + _, _ = e.cfg.Ports.Delivery.Deliver(dctx, target, res.Output, nil) +} + // detach derives a bounded cleanup context off ctx, detached from its // cancellation, for post-run writes. The cancel is intentionally not returned; // CleanupContextTimeout bounds the lifetime. diff --git a/tool/registry.go b/tool/registry.go index de05e2a..7528f3f 100644 --- a/tool/registry.go +++ b/tool/registry.go @@ -173,6 +173,13 @@ type Invocation struct { CallerID string ChannelID string GuildID string + // DeliveryKind / DeliveryID name where the executor posts the run's output + // via run.Ports.Delivery — a host-interpreted Target ("channel"/"dm"/ + // "thread"/...). An empty DeliveryID means the executor delivers nothing + // and the caller reads Result.Output itself (the synchronous default; the + // `.agent run` canary works this way). + DeliveryKind string + DeliveryID string // CallerIsAdmin is true when the caller is a mort admin (Member.Admin). // Populated by the executor at run dispatch via Bot.GetMember; defaults // to false on any lookup failure (member not found, DB error, empty