package run // steps.go — the per-run step emitter and the tool→step presentation // mapping. This is the single place that turns the executor's two loop // chokepoints (the pre-dispatch tool hook + the post-step observer in // executor.go) into ordered tool.Step records: one per tool call, // each with a stable id, an open-vocabulary kind, and a human // present-tense summary that flips running→complete/error. // // One source feeds two consumers (mirroring the OnEvent/OnToolEvent/ // PostRunResult pattern): the live tool.Invocation.OnStep callback // (nil-safe) AND snapshot(), which the executor copies onto Result.Steps. // Because the Result accumulation does not depend on OnStep being set, // every surface — chat (JSON + SSE), Discord, cron, sub-agents — carries // steps; OnStep is needed only for live streaming. import ( "context" "encoding/json" "fmt" "net/url" "strings" "time" "gitea.stevedudenhoeffer.com/steve/majordomo/llm" "gitea.stevedudenhoeffer.com/steve/executus/tool" ) // stepSummaryMaxLen caps the human summary length (section G size cap). // Detail is unused in v1 (no live detail source while replies are // generated blocking) so there is no Detail cap yet. const stepSummaryMaxLen = 200 // stepEmitter accumulates ordered steps for one run and fires the live // OnStep callback. // // Concurrency: touched ONLY from the agent-loop goroutine. Both call // sites (the hookToolbox `before` closure and the stepObserver) run // there; majordomo executes a step's tool calls sequentially, and // sub-agents build their own Invocation so they never reach this // emitter. Same single-goroutine contract as the audit Writer and the // critic ProgressRecorder — no internal lock. type stepEmitter struct { onStep func(ctx context.Context, ev tool.StepEvent) now func() time.Time seq int steps []tool.Step // ordered; the snapshot for Result.Steps byID map[string]int // step id -> index into steps pending map[string][]string // correlation key -> queued running ids (FIFO) } // newStepEmitter returns an emitter that forwards to onStep (nil-safe). func newStepEmitter(onStep func(ctx context.Context, ev tool.StepEvent)) *stepEmitter { return &stepEmitter{ onStep: onStep, now: time.Now, byID: map[string]int{}, pending: map[string][]string{}, } } // corrKey correlates a "start" (name + raw args, no call id available at // the pre-dispatch hook) with its later "end" (the stepObserver has the // full call incl. id + the same raw args). func corrKey(name string, args json.RawMessage) string { return name + "\x00" + string(args) } // toolStart records + emits the "start" of a tool call. Called from the // pre-dispatch hookToolbox closure, before the tool runs. func (e *stepEmitter) toolStart(ctx context.Context, name string, args json.RawMessage) { if e == nil { return } step := e.newStep(name, args) key := corrKey(name, args) e.pending[key] = append(e.pending[key], step.ID) e.fire(ctx, "start", step) } // toolEnd records + emits the terminal "end" of a tool call. Called from // the stepObserver for each completed tool call. If no matching start was // seen (e.g. a tool with a nil handler the pre-dispatch hook skipped), a // start is synthesized so the step still appears. func (e *stepEmitter) toolEnd(ctx context.Context, call llm.ToolCall, result string, isError bool) { if e == nil { return } id := e.matchPending(call.Name, call.Arguments) if id == "" { id = e.newStep(call.Name, call.Arguments).ID } idx, ok := e.byID[id] if !ok { return } step := &e.steps[idx] end := e.now() step.EndedAt = &end if isError { step.Status = "error" } else { step.Status = "complete" } if s := summaryForEnd(call.Name, call.Arguments, result, isError); s != "" { step.Summary = s } e.fire(ctx, "end", *step) } // newStep mints + appends a running step and returns it (by value). func (e *stepEmitter) newStep(name string, args json.RawMessage) tool.Step { e.seq++ step := tool.Step{ ID: fmt.Sprintf("s%d", e.seq), Kind: kindForTool(name), Title: name, Summary: summaryForStart(name, args), Status: "running", StartedAt: e.now(), } e.byID[step.ID] = len(e.steps) e.steps = append(e.steps, step) return step } // matchPending pops the oldest running step id for (name, args). Falls // back to the most recent still-running step of the same tool name when // the args don't byte-match between start and end. Returns "" on no match. func (e *stepEmitter) matchPending(name string, args json.RawMessage) string { key := corrKey(name, args) if q := e.pending[key]; len(q) > 0 { id := q[0] if len(q) == 1 { delete(e.pending, key) } else { e.pending[key] = q[1:] } return id } for i := len(e.steps) - 1; i >= 0; i-- { if e.steps[i].Title == name && e.steps[i].Status == "running" { return e.steps[i].ID } } return "" } func (e *stepEmitter) fire(ctx context.Context, phase string, step tool.Step) { if e.onStep == nil { return } e.onStep(ctx, tool.StepEvent{Phase: phase, Step: step}) } // snapshot returns a copy of the ordered, deduplicated step set for the // run Result. A step still "running" at run end (e.g. the run was // cancelled mid-tool-call) is reported as-is. func (e *stepEmitter) snapshot() []tool.Step { if e == nil || len(e.steps) == 0 { return nil } out := make([]tool.Step, len(e.steps)) copy(out, e.steps) return out } // kindForTool maps a tool name to an open-vocabulary step kind. Unknown // tools fall back to "tool" — never an error, just a generic step (the // client maps unknown kinds to a default icon). Loosely tracks the // catalog in pkg/skilltools/CLAUDE.md. func kindForTool(name string) string { switch name { case "web_search", "search_reddit", "wikipedia_summary": return "search" case "read_page", "read_pdf", "read_reddit", "read_video", "verify_url", "summary_summarise", "summarize", "file_get_text", "file_get_metadata", "http_get", "http_post", "http_get_stream", "http_stream_read": return "read" case "code_exec", "calculate": return "code" case "file_save", "file_get", "file_list", "file_delete", "file_search": return "file" case "kv_get", "kv_set", "kv_list", "kv_delete", "remember", "recall", "chatbot_get_memories": return "memory" case "query", "query_research", "deepresearch", "animate", "agent_invoke", "agent_invoke_parallel", "agent_spawn", "agent_spawn_parallel", "skill_invoke", "skill_invoke_parallel": return "delegate" case "think": return "thinking" default: switch { case strings.HasPrefix(name, "image") || strings.Contains(name, "draw"): return "image" default: return "tool" } } } // summaryForStart builds the human present-tense running summary. It // derives specifics from safe arg fields only; secret-bearing tools // (mcp_call, email_send, http_*) are summarized without echoing args. func summaryForStart(name string, args json.RawMessage) string { var s string switch name { case "web_search": if q := argString(args, "query", "q"); q != "" { s = fmt.Sprintf("Searching the web for %q", q) } else { s = "Searching the web" } case "search_reddit": if q := argString(args, "query", "q"); q != "" { s = fmt.Sprintf("Searching Reddit for %q", q) } else { s = "Searching Reddit" } case "wikipedia_summary": if q := argString(args, "query", "title"); q != "" { s = fmt.Sprintf("Looking up %q on Wikipedia", q) } else { s = "Looking up Wikipedia" } case "read_page", "read_pdf", "read_reddit", "read_video", "verify_url": if u := argString(args, "url", "post", "page"); u != "" { s = "Reading " + hostOf(u) } else { s = "Reading a page" } case "http_get", "http_post", "http_get_stream": // Show host only — a full URL can embed credentials/tokens. if u := argString(args, "url"); u != "" { s = "Fetching " + hostOf(u) } else { s = "Making an HTTP request" } case "summary_summarise", "summarize": s = "Summarizing text" case "translate": if lang := argString(args, "target_lang", "target_language", "lang"); lang != "" { s = "Translating to " + lang } else { s = "Translating text" } case "code_exec": s = "Running code" case "calculate": if q := argString(args, "query", "expression", "expr"); q != "" { s = "Calculating " + truncateStep(q, 60) } else { s = "Calculating" } case "remember": // Never echo the stored value. s = "Saving a memory" case "recall", "chatbot_get_memories": s = "Recalling memories" case "kv_get", "kv_list": s = "Reading saved data" case "kv_set": s = "Saving data" case "kv_delete": s = "Deleting saved data" case "file_save": if n := argString(args, "name", "filename"); n != "" { s = "Saving file " + truncateStep(n, 60) } else { s = "Saving a file" } case "file_get", "file_get_text", "file_get_metadata": s = "Reading a file" case "file_list", "file_search": s = "Listing files" case "query", "query_research": if q := argString(args, "query", "question", "prompt", "task"); q != "" { s = "Researching " + truncateStep(q, 80) } else { s = "Researching" } case "deepresearch": s = "Running deep research" case "animate": s = "Generating an animation" case "agent_invoke", "agent_spawn": if a := argString(args, "agent", "agent_name", "name"); a != "" { s = "Delegating to " + a } else { s = "Delegating to a sub-agent" } case "agent_invoke_parallel", "agent_spawn_parallel": s = "Delegating to sub-agents" case "skill_invoke": if sk := argString(args, "skill_name", "skill", "name"); sk != "" { s = "Running skill " + sk } else { s = "Running a skill" } case "skill_invoke_parallel": s = "Running skills in parallel" case "think": s = "Thinking" case "mcp_call": // Redact: MCP args frequently carry secrets. Name server/tool only. srv, tl := argString(args, "server"), argString(args, "tool") switch { case srv != "" && tl != "": s = fmt.Sprintf("Calling %s/%s", srv, tl) case srv != "": s = "Calling " + srv default: s = "Calling an MCP tool" } case "email_send": // Redact recipients + body. s = "Sending an email" default: s = "Using " + name } return truncateStep(s, stepSummaryMaxLen) } // summaryForEnd optionally upgrades the summary to a cheap result phrase. // Returns "" to keep the running summary (the caller then just flips the // status). Never returns a phrase derived from raw result bytes. func summaryForEnd(name string, _ json.RawMessage, result string, isError bool) string { if isError { return "" } switch name { case "web_search", "search_reddit": if n := countResults(result); n >= 0 { return fmt.Sprintf("Found %d result%s", n, plural(n)) } } return "" } // argString pulls the first present non-empty string field from a tool's // raw JSON args, trying keys in order. Returns "" when none parse. func argString(args json.RawMessage, keys ...string) string { if len(args) == 0 { return "" } var m map[string]any if err := json.Unmarshal(args, &m); err != nil { return "" } for _, k := range keys { if v, ok := m[k]; ok { if s, ok := v.(string); ok && strings.TrimSpace(s) != "" { return strings.TrimSpace(s) } } } return "" } // countResults parses a v11-style {"results":[...]} envelope and returns // the count, or -1 when the shape doesn't match. func countResults(result string) int { if strings.TrimSpace(result) == "" { return -1 } var env struct { Results []json.RawMessage `json:"results"` } if err := json.Unmarshal([]byte(result), &env); err != nil || env.Results == nil { return -1 } return len(env.Results) } // hostOf returns the bare host (no leading www.) of a URL, or a short // form of the raw string when it doesn't parse as a URL. func hostOf(raw string) string { if u, err := url.Parse(raw); err == nil && u.Host != "" { return strings.TrimPrefix(u.Host, "www.") } return truncateStep(raw, 60) } // truncateStep rune-safely caps s to max, appending an ellipsis when cut. func truncateStep(s string, max int) string { if max <= 0 { return "" } r := []rune(s) if len(r) <= max { return s } if max == 1 { return string(r[:1]) } return string(r[:max-1]) + "…" } func plural(n int) string { if n == 1 { return "" } return "s" }