From 4e179259deac900e6c3265f4292da8bc7939a701 Mon Sep 17 00:00:00 2001 From: Steve Dudenhoeffer Date: Sat, 27 Jun 2026 18:13:16 -0400 Subject: [PATCH] run: wire SessionToolFactory + PostRun artifacts + AttachImages MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The session-tool TYPES already lived in tool/ (P4 move) but the executor never used them. This wires them, unblocking artifact-producing host surfaces (mort's chat API / chatbot / .skill / scaddy) to run on executus: - run/session.go: steerMailbox (thread-safe message queue) + runSession (tool.AgentSession over it: AttachImages → a user-role multimodal message injected before the agent's next step) + runPostRun (panic-isolated hook call). - executor: create the mailbox + set inv.AttachImages BEFORE the toolbox build; add inv.ExtraTools + a SessionToolFactory's per-run Tools to the toolbox; defer its Cleanup; merge the session mailbox with the critic's nudges into ONE WithSteer; after the run, call PostRun with the full transcript (runRes.Messages) → Result.PostRunResult (best-effort, never fails the run). - run.Result += PostRunResult *tool.PostRunResult. - dropped the now-dead criticBinding.steerOptions (superseded by drainSteer). Tests: a factory whose PostRun emits an artifact from the output+transcript + Cleanup lands on Result.PostRunResult; a factory-added tool is callable. Co-Authored-By: Claude Opus 4.8 (1M context) --- run/critic.go | 8 ++-- run/executor.go | 54 +++++++++++++++++++++++++- run/session.go | 76 ++++++++++++++++++++++++++++++++++++ run/session_test.go | 94 +++++++++++++++++++++++++++++++++++++++++++++ 4 files changed, 227 insertions(+), 5 deletions(-) create mode 100644 run/session.go create mode 100644 run/session_test.go diff --git a/run/critic.go b/run/critic.go index a8798dc..cbba6d6 100644 --- a/run/critic.go +++ b/run/critic.go @@ -114,11 +114,11 @@ func (b *criticBinding) maxStepsOption(base int) agent.Option { }) } -// steerOptions returns the agent RunOptions that drain the critic's steer -// messages into the loop. Empty when there is no critic. -func (b *criticBinding) steerOptions() []agent.RunOption { +// drainSteer returns the critic's queued steer messages (nil-safe), so the +// executor can merge them with the session steer mailbox into one WithSteer. +func (b *criticBinding) drainSteer() []llm.Message { if b == nil { return nil } - return []agent.RunOption{agent.WithSteer(b.h.Steer)} + return b.h.Steer() } diff --git a/run/executor.go b/run/executor.go index 9cad80e..32f76ad 100644 --- a/run/executor.go +++ b/run/executor.go @@ -101,6 +101,10 @@ type Result struct { Steps []tool.Step Usage llm.Usage Err error + // PostRunResult carries artifacts produced by a SessionToolFactory's PostRun + // hook (rendered images, files). nil when no factory was set or PostRun + // returned nil. The host delivers these (e.g. mort's chat API / Discord). + PostRunResult *tool.PostRunResult } // Run executes ra with the given invocation + input and returns the Result. It @@ -176,6 +180,12 @@ func (e *Executor) Run(ctx context.Context, ra RunnableAgent, inv tool.Invocatio inv.RunState = stateAcc } + // Steer mailbox: lets session tools (via inv.AttachImages) feed multimodal + // messages into the running conversation before its next step. Created BEFORE + // the toolbox build so any tool's handler captures the live AttachImages seam. + mailbox := &steerMailbox{} + inv.AttachImages = (&runSession{mailbox: mailbox}).AttachImages + // Build the toolbox from the agent's low-level tools. toolbox, err := e.cfg.Registry.Build(ra.LowLevelTools, inv, tool.Visibility("private"), nil) if err != nil { @@ -192,6 +202,34 @@ func (e *Executor) Run(ctx context.Context, ra RunnableAgent, inv tool.Invocatio return res } + // Per-invocation ExtraTools + a SessionToolFactory's per-run tools, added on + // top of the agent's palette. The factory closes over the live session (the + // AttachImages mailbox); its PostRun hook (held for after the run) produces + // artifacts attached to res.PostRunResult, and its Cleanup is deferred. All + // nil-safe. + for _, t := range inv.ExtraTools { + if err := toolbox.Add(t); err != nil { + res.Err = fmt.Errorf("add extra tool: %w", err) + e.finishAudit(ctx, rec, "error", res, started, res.Err) + return res + } + } + var postRun func(ctx context.Context, transcript []llm.Message, output string, runErr error) *tool.PostRunResult + if inv.SessionToolFactory != nil { + st := inv.SessionToolFactory(&runSession{mailbox: mailbox}) + if st.Cleanup != nil { + defer st.Cleanup() + } + for _, t := range st.Tools { + if err := toolbox.Add(t); err != nil { + res.Err = fmt.Errorf("add session tool: %w", err) + e.finishAudit(ctx, rec, "error", res, started, res.Err) + return res + } + } + postRun = st.PostRun + } + // Run context: bound by MaxRuntime, detached from the caller's deadline so a // lane/queue wait doesn't eat the run budget (mort's V10 lesson). Caller // cancellation still propagates via MergeCancellation. Created BEFORE the @@ -279,7 +317,10 @@ func (e *Executor) Run(ctx context.Context, ra RunnableAgent, inv tool.Invocatio } ag := agent.New(model, e.systemPrompt(ra), opts...) - runRes, runErr := ag.Run(runCtx, input, critic.steerOptions()...) + // One WithSteer drains BOTH the session mailbox (a tool's AttachImages) and + // the critic's nudges before each step. + steer := func() []llm.Message { return append(mailbox.drain(), critic.drainSteer()...) } + runRes, runErr := ag.Run(runCtx, input, agent.WithSteer(steer)) status := statusFor(runCtx, runErr) if runRes != nil { @@ -289,6 +330,17 @@ func (e *Executor) Run(ctx context.Context, ra RunnableAgent, inv tool.Invocatio res.Steps = emitter.snapshot() res.Err = runErr + // PostRun: hand the SessionToolFactory's hook the full transcript (populated + // even on partial results) so it can produce artifacts. Best-effort + + // panic-isolated — a PostRun failure never fails an otherwise-successful run. + if postRun != nil { + var transcript []llm.Message + if runRes != nil { + transcript = runRes.Messages + } + res.PostRunResult = runPostRun(ctx, postRun, transcript, res.Output, runErr) + } + e.finishAudit(ctx, rec, status, res, started, runErr) if e.cfg.Ports.Budget != nil { e.cfg.Ports.Budget.Commit(detach(ctx), inv.CallerID, time.Since(started).Seconds()) diff --git a/run/session.go b/run/session.go new file mode 100644 index 0000000..21b7a04 --- /dev/null +++ b/run/session.go @@ -0,0 +1,76 @@ +package run + +import ( + "context" + "log/slog" + "strings" + "sync" + + "gitea.stevedudenhoeffer.com/steve/majordomo/llm" + + "gitea.stevedudenhoeffer.com/steve/executus/tool" +) + +// runPostRun invokes a SessionToolFactory's PostRun hook with panic isolation: +// a PostRun panic (or a slow artifact build that the hook mishandles) must not +// fail an otherwise-successful run — artifacts are best-effort, the agent's text +// output is the source of truth. +func runPostRun(ctx context.Context, + hook func(context.Context, []llm.Message, string, error) *tool.PostRunResult, + transcript []llm.Message, output string, runErr error) (prr *tool.PostRunResult) { + defer func() { + if r := recover(); r != nil { + slog.Error("run: PostRun hook panicked; no artifacts produced", "panic", r) + prr = nil + } + }() + return hook(ctx, transcript, output, runErr) +} + +// steerMailbox is a thread-safe queue of messages a session tool (via +// tool.Invocation.AttachImages) wants injected into the agent loop before its +// next step — the same WithSteer mechanism the critic uses for nudges, exposed +// to ordinary tools so they can show the model content (e.g. a rendered +// preview) it must SEE, not just be told about. +type steerMailbox struct { + mu sync.Mutex + msgs []llm.Message +} + +func (m *steerMailbox) push(msg llm.Message) { + m.mu.Lock() + m.msgs = append(m.msgs, msg) + m.mu.Unlock() +} + +// drain returns and clears the queued messages (nil when empty). +func (m *steerMailbox) drain() []llm.Message { + m.mu.Lock() + defer m.mu.Unlock() + if len(m.msgs) == 0 { + return nil + } + out := m.msgs + m.msgs = nil + return out +} + +// runSession implements tool.AgentSession over a steer mailbox: AttachImages +// queues a user-role multimodal message the agent loop injects before its next +// step. Replaces legacy agentkit's Agent.AttachImages — majordomo's *agent.Agent +// is immutable mid-run, so mutation flows through the run-scoped steer mailbox. +type runSession struct{ mailbox *steerMailbox } + +func (s *runSession) AttachImages(text string, images ...llm.ImagePart) { + parts := make([]llm.Part, 0, len(images)+1) + if strings.TrimSpace(text) != "" { + parts = append(parts, llm.Text(text)) + } + for _, img := range images { + parts = append(parts, img) + } + if len(parts) == 0 { + return + } + s.mailbox.push(llm.UserParts(parts...)) +} diff --git a/run/session_test.go b/run/session_test.go new file mode 100644 index 0000000..1a4809f --- /dev/null +++ b/run/session_test.go @@ -0,0 +1,94 @@ +package run_test + +import ( + "context" + "testing" + + "gitea.stevedudenhoeffer.com/steve/majordomo/llm" + "gitea.stevedudenhoeffer.com/steve/majordomo/provider/fake" + + "gitea.stevedudenhoeffer.com/steve/executus/run" + "gitea.stevedudenhoeffer.com/steve/executus/tool" +) + +// TestSessionToolFactoryPostRun: a SessionToolFactory's PostRun hook produces an +// artifact (from the run output + transcript) that lands on Result.PostRunResult, +// and its Cleanup is deferred. +func TestSessionToolFactoryPostRun(t *testing.T) { + fp := fake.New("fake") + fp.Enqueue("m", fake.Reply("hello artifacts")) + m, _ := fp.Model("m") + + cleanupCalled := false + inv := tool.Invocation{ + RunID: "r1", + SessionToolFactory: func(_ tool.AgentSession) tool.SessionTools { + return tool.SessionTools{ + PostRun: func(_ context.Context, transcript []llm.Message, output string, _ error) *tool.PostRunResult { + return &tool.PostRunResult{ + Artifacts: []tool.Artifact{{Name: "out.txt", MimeType: "text/plain", Data: []byte(output)}}, + Metadata: map[string]any{"transcript_len": len(transcript)}, + } + }, + Cleanup: func() { cleanupCalled = true }, + } + }, + } + ex := run.New(run.Config{ + Registry: tool.NewRegistry(), + Models: func(ctx context.Context, _ string) (context.Context, llm.Model, error) { return ctx, m, nil }, + }) + res := ex.Run(context.Background(), run.RunnableAgent{ModelTier: "m"}, inv, "go") + if res.Err != nil { + t.Fatalf("run error: %v", res.Err) + } + if res.PostRunResult == nil { + t.Fatal("Result.PostRunResult is nil — PostRun hook not invoked / not attached") + } + if n := len(res.PostRunResult.Artifacts); n != 1 { + t.Fatalf("artifacts = %d, want 1", n) + } + a := res.PostRunResult.Artifacts[0] + if a.Name != "out.txt" || string(a.Data) != "hello artifacts" { + t.Errorf("artifact = {%q, %q}", a.Name, string(a.Data)) + } + if tl, _ := res.PostRunResult.Metadata["transcript_len"].(int); tl < 1 { + t.Errorf("transcript not passed to PostRun (len=%d)", tl) + } + if !cleanupCalled { + t.Error("Cleanup was not deferred/called") + } +} + +// TestSessionToolFactoryAddsTool: tools the factory returns join the run's +// toolbox and are callable by the model. +func TestSessionToolFactoryAddsTool(t *testing.T) { + fp := fake.New("fake") + fp.Enqueue("m", + fake.ReplyWith(llm.Response{ToolCalls: []llm.ToolCall{{ID: "c1", Name: "render", Arguments: []byte(`{}`)}}}), + fake.Reply("rendered"), + ) + m, _ := fp.Model("m") + + toolCalled := false + renderTool := llm.DefineTool("render", "render a preview", + func(_ context.Context, _ struct{}) (any, error) { toolCalled = true; return "ok", nil }) + inv := tool.Invocation{ + RunID: "r2", + SessionToolFactory: func(_ tool.AgentSession) tool.SessionTools { + return tool.SessionTools{Tools: []llm.Tool{renderTool}} + }, + } + ex := run.New(run.Config{ + Registry: tool.NewRegistry(), + Models: func(ctx context.Context, _ string) (context.Context, llm.Model, error) { return ctx, m, nil }, + }) + res := ex.Run(context.Background(), + run.RunnableAgent{ModelTier: "m", MaxIterations: 5}, inv, "go") + if res.Err != nil { + t.Fatalf("run error: %v", res.Err) + } + if !toolCalled { + t.Error("session-factory tool was not added to the toolbox / not called") + } +}