4132af0216
- compact/compactor.go: the per-run stateful context compactor (token-threshold gate, fast-tier middle summarisation, fold memory) lifted from mort's skillexec/compactor.go. Self-contained; its only dependency is a ModelResolver func (model.ParseModelForContext satisfies it) + a token threshold. - run/steps.go: the step-emission/instrumentation (stepEmitter, tool->kind/ summary mapping with redaction, Result.Steps accumulation) from agentexec, repointed onto executus/tool. Both build green. With the run-loop mechanics, RunnableAgent DTO, run.Ports, compactor, and step instrumentation now all in place, the remaining P2 work is the run.Executor itself (wiring these + majordomo's agent loop), which makes executus runnable. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
408 lines
12 KiB
Go
408 lines
12 KiB
Go
package run
|
|
|
|
// steps.go — the per-run step emitter and the tool→step presentation
|
|
// mapping. This is the single place that turns the executor's two loop
|
|
// chokepoints (the pre-dispatch tool hook + the post-step observer in
|
|
// executor.go) into ordered tool.Step records: one per tool call,
|
|
// each with a stable id, an open-vocabulary kind, and a human
|
|
// present-tense summary that flips running→complete/error.
|
|
//
|
|
// One source feeds two consumers (mirroring the OnEvent/OnToolEvent/
|
|
// PostRunResult pattern): the live tool.Invocation.OnStep callback
|
|
// (nil-safe) AND snapshot(), which the executor copies onto Result.Steps.
|
|
// Because the Result accumulation does not depend on OnStep being set,
|
|
// every surface — chat (JSON + SSE), Discord, cron, sub-agents — carries
|
|
// steps; OnStep is needed only for live streaming.
|
|
|
|
import (
|
|
"context"
|
|
"encoding/json"
|
|
"fmt"
|
|
"net/url"
|
|
"strings"
|
|
"time"
|
|
|
|
"gitea.stevedudenhoeffer.com/steve/majordomo/llm"
|
|
|
|
"gitea.stevedudenhoeffer.com/steve/executus/tool"
|
|
)
|
|
|
|
// stepSummaryMaxLen caps the human summary length (section G size cap).
|
|
// Detail is unused in v1 (no live detail source while replies are
|
|
// generated blocking) so there is no Detail cap yet.
|
|
const stepSummaryMaxLen = 200
|
|
|
|
// stepEmitter accumulates ordered steps for one run and fires the live
|
|
// OnStep callback.
|
|
//
|
|
// Concurrency: touched ONLY from the agent-loop goroutine. Both call
|
|
// sites (the hookToolbox `before` closure and the stepObserver) run
|
|
// there; majordomo executes a step's tool calls sequentially, and
|
|
// sub-agents build their own Invocation so they never reach this
|
|
// emitter. Same single-goroutine contract as the audit Writer and the
|
|
// critic ProgressRecorder — no internal lock.
|
|
type stepEmitter struct {
|
|
onStep func(ctx context.Context, ev tool.StepEvent)
|
|
now func() time.Time
|
|
|
|
seq int
|
|
steps []tool.Step // ordered; the snapshot for Result.Steps
|
|
byID map[string]int // step id -> index into steps
|
|
pending map[string][]string // correlation key -> queued running ids (FIFO)
|
|
}
|
|
|
|
// newStepEmitter returns an emitter that forwards to onStep (nil-safe).
|
|
func newStepEmitter(onStep func(ctx context.Context, ev tool.StepEvent)) *stepEmitter {
|
|
return &stepEmitter{
|
|
onStep: onStep,
|
|
now: time.Now,
|
|
byID: map[string]int{},
|
|
pending: map[string][]string{},
|
|
}
|
|
}
|
|
|
|
// corrKey correlates a "start" (name + raw args, no call id available at
|
|
// the pre-dispatch hook) with its later "end" (the stepObserver has the
|
|
// full call incl. id + the same raw args).
|
|
func corrKey(name string, args json.RawMessage) string {
|
|
return name + "\x00" + string(args)
|
|
}
|
|
|
|
// toolStart records + emits the "start" of a tool call. Called from the
|
|
// pre-dispatch hookToolbox closure, before the tool runs.
|
|
func (e *stepEmitter) toolStart(ctx context.Context, name string, args json.RawMessage) {
|
|
if e == nil {
|
|
return
|
|
}
|
|
step := e.newStep(name, args)
|
|
key := corrKey(name, args)
|
|
e.pending[key] = append(e.pending[key], step.ID)
|
|
e.fire(ctx, "start", step)
|
|
}
|
|
|
|
// toolEnd records + emits the terminal "end" of a tool call. Called from
|
|
// the stepObserver for each completed tool call. If no matching start was
|
|
// seen (e.g. a tool with a nil handler the pre-dispatch hook skipped), a
|
|
// start is synthesized so the step still appears.
|
|
func (e *stepEmitter) toolEnd(ctx context.Context, call llm.ToolCall, result string, isError bool) {
|
|
if e == nil {
|
|
return
|
|
}
|
|
id := e.matchPending(call.Name, call.Arguments)
|
|
if id == "" {
|
|
id = e.newStep(call.Name, call.Arguments).ID
|
|
}
|
|
idx, ok := e.byID[id]
|
|
if !ok {
|
|
return
|
|
}
|
|
step := &e.steps[idx]
|
|
end := e.now()
|
|
step.EndedAt = &end
|
|
if isError {
|
|
step.Status = "error"
|
|
} else {
|
|
step.Status = "complete"
|
|
}
|
|
if s := summaryForEnd(call.Name, call.Arguments, result, isError); s != "" {
|
|
step.Summary = s
|
|
}
|
|
e.fire(ctx, "end", *step)
|
|
}
|
|
|
|
// newStep mints + appends a running step and returns it (by value).
|
|
func (e *stepEmitter) newStep(name string, args json.RawMessage) tool.Step {
|
|
e.seq++
|
|
step := tool.Step{
|
|
ID: fmt.Sprintf("s%d", e.seq),
|
|
Kind: kindForTool(name),
|
|
Title: name,
|
|
Summary: summaryForStart(name, args),
|
|
Status: "running",
|
|
StartedAt: e.now(),
|
|
}
|
|
e.byID[step.ID] = len(e.steps)
|
|
e.steps = append(e.steps, step)
|
|
return step
|
|
}
|
|
|
|
// matchPending pops the oldest running step id for (name, args). Falls
|
|
// back to the most recent still-running step of the same tool name when
|
|
// the args don't byte-match between start and end. Returns "" on no match.
|
|
func (e *stepEmitter) matchPending(name string, args json.RawMessage) string {
|
|
key := corrKey(name, args)
|
|
if q := e.pending[key]; len(q) > 0 {
|
|
id := q[0]
|
|
if len(q) == 1 {
|
|
delete(e.pending, key)
|
|
} else {
|
|
e.pending[key] = q[1:]
|
|
}
|
|
return id
|
|
}
|
|
for i := len(e.steps) - 1; i >= 0; i-- {
|
|
if e.steps[i].Title == name && e.steps[i].Status == "running" {
|
|
return e.steps[i].ID
|
|
}
|
|
}
|
|
return ""
|
|
}
|
|
|
|
func (e *stepEmitter) fire(ctx context.Context, phase string, step tool.Step) {
|
|
if e.onStep == nil {
|
|
return
|
|
}
|
|
e.onStep(ctx, tool.StepEvent{Phase: phase, Step: step})
|
|
}
|
|
|
|
// snapshot returns a copy of the ordered, deduplicated step set for the
|
|
// run Result. A step still "running" at run end (e.g. the run was
|
|
// cancelled mid-tool-call) is reported as-is.
|
|
func (e *stepEmitter) snapshot() []tool.Step {
|
|
if e == nil || len(e.steps) == 0 {
|
|
return nil
|
|
}
|
|
out := make([]tool.Step, len(e.steps))
|
|
copy(out, e.steps)
|
|
return out
|
|
}
|
|
|
|
// kindForTool maps a tool name to an open-vocabulary step kind. Unknown
|
|
// tools fall back to "tool" — never an error, just a generic step (the
|
|
// client maps unknown kinds to a default icon). Loosely tracks the
|
|
// catalog in pkg/skilltools/CLAUDE.md.
|
|
func kindForTool(name string) string {
|
|
switch name {
|
|
case "web_search", "search_reddit", "wikipedia_summary":
|
|
return "search"
|
|
case "read_page", "read_pdf", "read_reddit", "read_video", "verify_url",
|
|
"summary_summarise", "summarize", "file_get_text", "file_get_metadata",
|
|
"http_get", "http_post", "http_get_stream", "http_stream_read":
|
|
return "read"
|
|
case "code_exec", "calculate":
|
|
return "code"
|
|
case "file_save", "file_get", "file_list", "file_delete", "file_search":
|
|
return "file"
|
|
case "kv_get", "kv_set", "kv_list", "kv_delete",
|
|
"remember", "recall", "chatbot_get_memories":
|
|
return "memory"
|
|
case "query", "query_research", "deepresearch", "animate",
|
|
"agent_invoke", "agent_invoke_parallel", "agent_spawn",
|
|
"agent_spawn_parallel", "skill_invoke", "skill_invoke_parallel":
|
|
return "delegate"
|
|
case "think":
|
|
return "thinking"
|
|
default:
|
|
switch {
|
|
case strings.HasPrefix(name, "image") || strings.Contains(name, "draw"):
|
|
return "image"
|
|
default:
|
|
return "tool"
|
|
}
|
|
}
|
|
}
|
|
|
|
// summaryForStart builds the human present-tense running summary. It
|
|
// derives specifics from safe arg fields only; secret-bearing tools
|
|
// (mcp_call, email_send, http_*) are summarized without echoing args.
|
|
func summaryForStart(name string, args json.RawMessage) string {
|
|
var s string
|
|
switch name {
|
|
case "web_search":
|
|
if q := argString(args, "query", "q"); q != "" {
|
|
s = fmt.Sprintf("Searching the web for %q", q)
|
|
} else {
|
|
s = "Searching the web"
|
|
}
|
|
case "search_reddit":
|
|
if q := argString(args, "query", "q"); q != "" {
|
|
s = fmt.Sprintf("Searching Reddit for %q", q)
|
|
} else {
|
|
s = "Searching Reddit"
|
|
}
|
|
case "wikipedia_summary":
|
|
if q := argString(args, "query", "title"); q != "" {
|
|
s = fmt.Sprintf("Looking up %q on Wikipedia", q)
|
|
} else {
|
|
s = "Looking up Wikipedia"
|
|
}
|
|
case "read_page", "read_pdf", "read_reddit", "read_video", "verify_url":
|
|
if u := argString(args, "url", "post", "page"); u != "" {
|
|
s = "Reading " + hostOf(u)
|
|
} else {
|
|
s = "Reading a page"
|
|
}
|
|
case "http_get", "http_post", "http_get_stream":
|
|
// Show host only — a full URL can embed credentials/tokens.
|
|
if u := argString(args, "url"); u != "" {
|
|
s = "Fetching " + hostOf(u)
|
|
} else {
|
|
s = "Making an HTTP request"
|
|
}
|
|
case "summary_summarise", "summarize":
|
|
s = "Summarizing text"
|
|
case "translate":
|
|
if lang := argString(args, "target_lang", "target_language", "lang"); lang != "" {
|
|
s = "Translating to " + lang
|
|
} else {
|
|
s = "Translating text"
|
|
}
|
|
case "code_exec":
|
|
s = "Running code"
|
|
case "calculate":
|
|
if q := argString(args, "query", "expression", "expr"); q != "" {
|
|
s = "Calculating " + truncateStep(q, 60)
|
|
} else {
|
|
s = "Calculating"
|
|
}
|
|
case "remember":
|
|
// Never echo the stored value.
|
|
s = "Saving a memory"
|
|
case "recall", "chatbot_get_memories":
|
|
s = "Recalling memories"
|
|
case "kv_get", "kv_list":
|
|
s = "Reading saved data"
|
|
case "kv_set":
|
|
s = "Saving data"
|
|
case "kv_delete":
|
|
s = "Deleting saved data"
|
|
case "file_save":
|
|
if n := argString(args, "name", "filename"); n != "" {
|
|
s = "Saving file " + truncateStep(n, 60)
|
|
} else {
|
|
s = "Saving a file"
|
|
}
|
|
case "file_get", "file_get_text", "file_get_metadata":
|
|
s = "Reading a file"
|
|
case "file_list", "file_search":
|
|
s = "Listing files"
|
|
case "query", "query_research":
|
|
if q := argString(args, "query", "question", "prompt", "task"); q != "" {
|
|
s = "Researching " + truncateStep(q, 80)
|
|
} else {
|
|
s = "Researching"
|
|
}
|
|
case "deepresearch":
|
|
s = "Running deep research"
|
|
case "animate":
|
|
s = "Generating an animation"
|
|
case "agent_invoke", "agent_spawn":
|
|
if a := argString(args, "agent", "agent_name", "name"); a != "" {
|
|
s = "Delegating to " + a
|
|
} else {
|
|
s = "Delegating to a sub-agent"
|
|
}
|
|
case "agent_invoke_parallel", "agent_spawn_parallel":
|
|
s = "Delegating to sub-agents"
|
|
case "skill_invoke":
|
|
if sk := argString(args, "skill_name", "skill", "name"); sk != "" {
|
|
s = "Running skill " + sk
|
|
} else {
|
|
s = "Running a skill"
|
|
}
|
|
case "skill_invoke_parallel":
|
|
s = "Running skills in parallel"
|
|
case "think":
|
|
s = "Thinking"
|
|
case "mcp_call":
|
|
// Redact: MCP args frequently carry secrets. Name server/tool only.
|
|
srv, tl := argString(args, "server"), argString(args, "tool")
|
|
switch {
|
|
case srv != "" && tl != "":
|
|
s = fmt.Sprintf("Calling %s/%s", srv, tl)
|
|
case srv != "":
|
|
s = "Calling " + srv
|
|
default:
|
|
s = "Calling an MCP tool"
|
|
}
|
|
case "email_send":
|
|
// Redact recipients + body.
|
|
s = "Sending an email"
|
|
default:
|
|
s = "Using " + name
|
|
}
|
|
return truncateStep(s, stepSummaryMaxLen)
|
|
}
|
|
|
|
// summaryForEnd optionally upgrades the summary to a cheap result phrase.
|
|
// Returns "" to keep the running summary (the caller then just flips the
|
|
// status). Never returns a phrase derived from raw result bytes.
|
|
func summaryForEnd(name string, _ json.RawMessage, result string, isError bool) string {
|
|
if isError {
|
|
return ""
|
|
}
|
|
switch name {
|
|
case "web_search", "search_reddit":
|
|
if n := countResults(result); n >= 0 {
|
|
return fmt.Sprintf("Found %d result%s", n, plural(n))
|
|
}
|
|
}
|
|
return ""
|
|
}
|
|
|
|
// argString pulls the first present non-empty string field from a tool's
|
|
// raw JSON args, trying keys in order. Returns "" when none parse.
|
|
func argString(args json.RawMessage, keys ...string) string {
|
|
if len(args) == 0 {
|
|
return ""
|
|
}
|
|
var m map[string]any
|
|
if err := json.Unmarshal(args, &m); err != nil {
|
|
return ""
|
|
}
|
|
for _, k := range keys {
|
|
if v, ok := m[k]; ok {
|
|
if s, ok := v.(string); ok && strings.TrimSpace(s) != "" {
|
|
return strings.TrimSpace(s)
|
|
}
|
|
}
|
|
}
|
|
return ""
|
|
}
|
|
|
|
// countResults parses a v11-style {"results":[...]} envelope and returns
|
|
// the count, or -1 when the shape doesn't match.
|
|
func countResults(result string) int {
|
|
if strings.TrimSpace(result) == "" {
|
|
return -1
|
|
}
|
|
var env struct {
|
|
Results []json.RawMessage `json:"results"`
|
|
}
|
|
if err := json.Unmarshal([]byte(result), &env); err != nil || env.Results == nil {
|
|
return -1
|
|
}
|
|
return len(env.Results)
|
|
}
|
|
|
|
// hostOf returns the bare host (no leading www.) of a URL, or a short
|
|
// form of the raw string when it doesn't parse as a URL.
|
|
func hostOf(raw string) string {
|
|
if u, err := url.Parse(raw); err == nil && u.Host != "" {
|
|
return strings.TrimPrefix(u.Host, "www.")
|
|
}
|
|
return truncateStep(raw, 60)
|
|
}
|
|
|
|
// truncateStep rune-safely caps s to max, appending an ellipsis when cut.
|
|
func truncateStep(s string, max int) string {
|
|
if max <= 0 {
|
|
return ""
|
|
}
|
|
r := []rune(s)
|
|
if len(r) <= max {
|
|
return s
|
|
}
|
|
if max == 1 {
|
|
return string(r[:1])
|
|
}
|
|
return string(r[:max-1]) + "…"
|
|
}
|
|
|
|
func plural(n int) string {
|
|
if n == 1 {
|
|
return ""
|
|
}
|
|
return "s"
|
|
}
|