From 69c2eb5f47c46464308d90ad937c69d7d2994e79 Mon Sep 17 00:00:00 2001 From: Steve Dudenhoeffer Date: Fri, 26 Jun 2026 21:42:46 -0400 Subject: [PATCH] fix: address verified gadfly P2 findings (9 real of 18) Independently verified all 18 gadfly findings against the code (18-agent fan-out). Fixed the 9 real ones; the other 9 were false-positive / hallucinated / valid-tradeoff (no change). High: - F1 nil model: a Models resolver returning (ctx,nil,nil) flowed into the agent loop and nil-panicked. Now a clean error (Run never panics). +test. - F9 compactor data-leak: renderTranscript sent tool-call args verbatim to the summarizer (a possibly-different provider/tier); secret-bearing tool args (mcp_call/email_send/http_*/webhook_*) are now redacted, with a doc note that result bodies still flow (summary needs them). Medium/minor: - F2 compactor error path returned the folded slice, not the original msgs (contradicting the documented non-fatal contract) -> return msgs. - F3 RunStats.Status only ok/error; now timeout (DeadlineExceeded) / cancelled (Canceled) via statusFor. +test. - F4 step-zip emitted empty-name "ghost" steps when results>calls; now pairs min(calls,results) only. - F5 SetIteration was never called -> RunState.Iteration always 0; the step observer now updates it each loop. - F6 matchPending fallback was LIFO; now FIFO (matches the per-key queue). - F7 estimateTokens had no default arm (future Part kinds counted as 0); unknown parts now counted conservatively. - F8 cloud_sync silently truncated >1MiB responses -> opaque JSON error; now a clear "response exceeded N bytes" via readCapped. - F12 step observer captured the caller ctx; now the merged runCtx. - F13 compaction onFire was nil (doc claimed it logged); now wired to audit LogEvent("compaction_fired"). - F11 (no pre-dispatch hook in majordomo) documented honestly as a known limitation; F18 UsageSink doc clarified cache tokens are subsets of input. Co-Authored-By: Claude Opus 4.8 (1M context) --- compact/compactor.go | 46 ++++++++++++++++++++-- model/cloud_sync.go | 20 +++++++++- model/sink.go | 5 +++ run/executor.go | 92 ++++++++++++++++++++++++++++++++------------ run/executor_test.go | 36 +++++++++++++++++ run/steps.go | 36 +++++++++++------ 6 files changed, 193 insertions(+), 42 deletions(-) diff --git a/compact/compactor.go b/compact/compactor.go index 0749ada..91e5c24 100644 --- a/compact/compactor.go +++ b/compact/compactor.go @@ -180,8 +180,11 @@ func compactIfNeeded(ctx context.Context, cfg CompactorConfig, st *compactionSta summary, err := summariseMiddle(ctx, cfg, st.summaryText, middle) if err != nil { - // Non-fatal upstream: the agent loop sends the original slice. - return rendered, fmt.Errorf("compactor: summarise middle: %w", err) + // Non-fatal upstream: the agent loop sends the ORIGINAL slice. Return + // msgs, not `rendered` — on a second+ compaction `rendered` already + // carries a prior synthetic summary, which is not the documented + // "original slice" the loop expects on a compactor error. + return msgs, fmt.Errorf("compactor: summarise middle: %w", err) } st.summaryText = summary st.prefixEnd = endMiddle @@ -285,6 +288,14 @@ func estimateTokens(msgs []llm.Message) int { chars += len(v.Text) case llm.ImagePart: chars += 4096 + default: + // llm.Part is a sealed-but-extensible interface (future media + // kinds). Count an unknown part conservatively (like an image) + // rather than 0, so a transcript of unrecognised content can't + // silently slip under the compaction threshold and 400 the + // model. Bump this if a large new part kind lands. + _ = v + chars += 4096 } } for _, tc := range m.ToolCalls { @@ -302,9 +313,36 @@ func estimateTokens(msgs []llm.Message) int { // summarizer. const transcriptMessageCap = 2048 +// secretBearingTools name tools whose ARGUMENTS routinely carry credentials or +// message bodies (bearer tokens, API keys, recipients, request bodies). Their +// args are dropped before the transcript reaches the summarizer model — which +// may be a different provider/tier than the run model — mirroring the redaction +// run/steps.go applies to user-facing step summaries. http_* and webhook_* are +// matched by prefix below. +var secretBearingTools = map[string]bool{ + "mcp_call": true, + "email_send": true, +} + +// redactToolArgs returns a summariser-safe rendering of a tool call's args: +// "[redacted]" for known secret-bearing tools, the args verbatim otherwise. +func redactToolArgs(name, args string) string { + if secretBearingTools[name] || + strings.HasPrefix(name, "http_") || + strings.HasPrefix(name, "webhook_") { + return "[redacted]" + } + return args +} + // renderTranscript flattens a message slice to a plain-text transcript -// suitable for the summarisation prompt. Tool calls show name + args, +// suitable for the summarisation prompt. Tool calls show name + (redacted) args, // tool results show name + body. Empty fields are skipped. +// +// NOTE: tool-RESULT bodies are forwarded to the summarizer (the summary needs +// the findings). A host whose tool results may contain secrets and whose +// summarizer tier resolves to an untrusted provider should ensure that tier is +// trusted, or pre-sanitise results before they reach the agent loop. func renderTranscript(msgs []llm.Message) string { var sb strings.Builder for i, m := range msgs { @@ -314,7 +352,7 @@ func renderTranscript(msgs []llm.Message) string { sb.WriteString("\n") } for _, tc := range m.ToolCalls { - fmt.Fprintf(&sb, "tool_call name=%s args=%s\n", tc.Name, truncate(string(tc.Arguments), transcriptMessageCap)) + fmt.Fprintf(&sb, "tool_call name=%s args=%s\n", tc.Name, truncate(redactToolArgs(tc.Name, string(tc.Arguments)), transcriptMessageCap)) } for _, tr := range m.ToolResults { fmt.Fprintf(&sb, "tool_result name=%s body=%s\n", tr.Name, truncate(tr.Content, transcriptMessageCap)) diff --git a/model/cloud_sync.go b/model/cloud_sync.go index c214ca1..9abd00e 100644 --- a/model/cloud_sync.go +++ b/model/cloud_sync.go @@ -314,7 +314,7 @@ func (c *CloudOllamaLimitCache) fetchTags(ctx context.Context) ([]string, error) return nil, err } defer resp.Body.Close() - body, err := io.ReadAll(io.LimitReader(resp.Body, maxLimitCacheResponseBytes)) + body, err := readCapped(resp.Body) if err != nil { return nil, err } @@ -367,7 +367,7 @@ func (c *CloudOllamaLimitCache) fetchContextLength(ctx context.Context, modelNam return 0, err } defer resp.Body.Close() - respBody, err := io.ReadAll(io.LimitReader(resp.Body, maxLimitCacheResponseBytes)) + respBody, err := readCapped(resp.Body) if err != nil { return 0, err } @@ -456,3 +456,19 @@ func truncate(b []byte, n int) string { // (/api/tags, /api/show) so a misbehaving endpoint can't stream an unbounded // body before the 15s timeout fires. 1 MiB is far above any real response. const maxLimitCacheResponseBytes = 1 << 20 + +// readCapped reads up to maxLimitCacheResponseBytes from r and returns a clear +// error if the response EXCEEDS the cap — rather than silently truncating (as a +// bare io.LimitReader does) and letting downstream json.Unmarshal fail with an +// opaque "unexpected end of JSON input". It reads one extra byte to detect the +// overflow. +func readCapped(r io.Reader) ([]byte, error) { + body, err := io.ReadAll(io.LimitReader(r, maxLimitCacheResponseBytes+1)) + if err != nil { + return nil, err + } + if len(body) > maxLimitCacheResponseBytes { + return nil, fmt.Errorf("cloud_sync: response exceeded %d bytes", maxLimitCacheResponseBytes) + } + return body, nil +} diff --git a/model/sink.go b/model/sink.go index 47575e2..664fedb 100644 --- a/model/sink.go +++ b/model/sink.go @@ -16,6 +16,11 @@ import ( // UsageSink receives one record per successful Generate through a model parsed // by this package (ParseModelRequest / ParseModelForContext). Implement it to // meter or bill; the token detail mirrors majordomo's Response.Usage. +// +// IMPORTANT: cacheReadTokens and cacheWriteTokens are PORTIONS of inputTokens, +// not independent additive values (they let a sink price cached vs fresh input +// differently). A sink must NOT compute total = input+output+cacheRead+ +// cacheWrite — that double-counts the cached input. type UsageSink interface { Record(ctx context.Context, model string, inputTokens, outputTokens, cacheReadTokens, cacheWriteTokens int) } diff --git a/run/executor.go b/run/executor.go index c1470fb..53d6940 100644 --- a/run/executor.go +++ b/run/executor.go @@ -2,6 +2,7 @@ package run import ( "context" + "errors" "fmt" "time" @@ -130,11 +131,18 @@ func (e *Executor) Run(ctx context.Context, ra RunnableAgent, inv tool.Invocatio res.Err = fmt.Errorf("resolve model %q: %w", tier, err) return res } + if model == nil { + // A resolver returning (ctx, nil, nil) would otherwise nil-panic inside + // the agent loop; surface it as a clean error (Run never panics out). + res.Err = fmt.Errorf("resolve model %q: resolver returned a nil model", tier) + return res + } ctx = modelCtx // Audit start (optional). The recorder satisfies RunTally; stamp it on the // invocation so a self-status tool can read live progress. var rec RunRecorder + var stateAcc *RunStateAccessor if e.cfg.Ports.Audit != nil { rec = e.cfg.Ports.Audit.StartRun(ctx, RunInfo{ RunID: inv.RunID, @@ -148,7 +156,8 @@ func (e *Executor) Run(ctx context.Context, ra RunnableAgent, inv tool.Invocatio }) } if rec != nil { - inv.RunState = NewRunStateAccessor(rec, maxIter, 0, started) + stateAcc = NewRunStateAccessor(rec, maxIter, 0, started) + inv.RunState = stateAcc } // Build the toolbox from the agent's low-level tools. @@ -159,11 +168,27 @@ func (e *Executor) Run(ctx context.Context, ra RunnableAgent, inv tool.Invocatio return res } - // Step instrumentation: accumulate Result.Steps + fire inv.OnStep, and feed - // the audit recorder. majordomo's step observer hands us each completed - // iteration; we zip the model's tool calls with their executed results. + // Run context: bound by MaxRuntime, detached from the caller's deadline so a + // lane/queue wait doesn't eat the run budget (mort's V10 lesson). Caller + // cancellation still propagates via MergeCancellation. Created BEFORE the + // step observer so the observer forwards the merged run context (not a + // possibly-cancelled caller ctx) to OnStep consumers. + runCtx, cancel := context.WithTimeout(context.WithoutCancel(ctx), maxRuntime) + defer cancel() + runCtx, mergeCancel := MergeCancellation(runCtx, ctx) + defer mergeCancel() + + // Step instrumentation: accumulate Result.Steps + fire inv.OnStep, feed the + // audit recorder, and keep the live iteration counter fresh. majordomo's + // step observer hands us each completed iteration; we zip the model's tool + // calls with their executed results PAIRWISE — a result without a matching + // call (or a call without a result) is skipped rather than recorded as an + // empty-name "ghost" step. emitter := newStepEmitter(inv.OnStep) stepObserver := func(s agent.Step) { + if stateAcc != nil { + stateAcc.SetIteration(s.Index) + } if rec != nil { rec.OnStep(s.Index, s.Response) } @@ -171,27 +196,20 @@ func (e *Executor) Run(ctx context.Context, ra RunnableAgent, inv tool.Invocatio if s.Response != nil { calls = s.Response.ToolCalls } - for i, r := range s.Results { - var call llm.ToolCall - if i < len(calls) { - call = calls[i] - } - emitter.toolStart(ctx, call.Name, call.Arguments) - emitter.toolEnd(ctx, call, r.Content, r.IsError) + n := len(s.Results) + if len(calls) < n { + n = len(calls) + } + for i := 0; i < n; i++ { + call, r := calls[i], s.Results[i] + emitter.toolStart(runCtx, call.Name, call.Arguments) + emitter.toolEnd(runCtx, call, r.Content, r.IsError) if rec != nil { rec.OnTool(call, r.Content) } } } - // Run context: bound by MaxRuntime, detached from the caller's deadline so a - // lane/queue wait doesn't eat the run budget (mort's V10 lesson). Caller - // cancellation still propagates via MergeCancellation. - runCtx, cancel := context.WithTimeout(context.WithoutCancel(ctx), maxRuntime) - defer cancel() - runCtx, mergeCancel := MergeCancellation(runCtx, ctx) - defer mergeCancel() - opts := []agent.Option{ agent.WithToolbox(toolbox), agent.WithMaxSteps(maxIter), @@ -200,17 +218,27 @@ func (e *Executor) Run(ctx context.Context, ra RunnableAgent, inv tool.Invocatio } if e.cfg.Compactor != nil && e.cfg.ContextTokens != nil { if threshold := e.compactionThreshold(tier); threshold > 0 { - opts = append(opts, agent.WithCompactor(e.cfg.Compactor(threshold, nil))) + // Forward compaction events to the audit log (makes the + // CompactionEvent doc's "logged to the run trace" promise true). + var onFire func(compact.CompactionEvent) + if rec != nil { + onFire = func(ev compact.CompactionEvent) { + rec.LogEvent("compaction_fired", map[string]any{ + "messages_before": ev.MessagesBefore, + "messages_after": ev.MessagesAfter, + "tokens_before": ev.TokensBefore, + "tokens_after": ev.TokensAfter, + }) + } + } + opts = append(opts, agent.WithCompactor(e.cfg.Compactor(threshold, onFire))) } } ag := agent.New(model, e.systemPrompt(ra), opts...) runRes, runErr := ag.Run(runCtx, input) - status := "ok" - if runErr != nil { - status = "error" - } + status := statusFor(runErr) if runRes != nil { res.Output = runRes.Output res.Usage = runRes.Usage @@ -225,6 +253,22 @@ func (e *Executor) Run(ctx context.Context, ra RunnableAgent, inv tool.Invocatio return res } +// statusFor maps a run error to a RunStats.Status, distinguishing a deadline +// (timeout) and a cancellation (cancelled — caller cancel or shutdown) from a +// generic error so audit consumers can tell them apart. +func statusFor(runErr error) string { + switch { + case runErr == nil: + return "ok" + case errors.Is(runErr, context.DeadlineExceeded): + return "timeout" + case errors.Is(runErr, context.Canceled): + return "cancelled" + default: + return "error" + } +} + // finishAudit writes the terminal roll-up on a detached context so a cancelled // run still records (mort's CleanupContextTimeout lesson). func (e *Executor) finishAudit(ctx context.Context, rec RunRecorder, status string, res Result, started time.Time, runErr error) { diff --git a/run/executor_test.go b/run/executor_test.go index dc4101e..250932c 100644 --- a/run/executor_test.go +++ b/run/executor_test.go @@ -3,6 +3,7 @@ package run import ( "context" "errors" + "fmt" "testing" "gitea.stevedudenhoeffer.com/steve/majordomo/llm" @@ -130,3 +131,38 @@ func (r *captureRecorder) OnTool(llm.ToolCall, string) { r.tools++ } func (r *captureRecorder) LogEvent(string, map[string]any) {} func (r *captureRecorder) LogError(string) {} func (r *captureRecorder) Close(_ context.Context, s RunStats) { r.closed = true; r.stats = s } + +// TestExecutorNilModelNoPanic: a resolver returning (ctx, nil, nil) yields a +// clean error, not a nil-pointer panic (gadfly F1, high severity). +func TestExecutorNilModelNoPanic(t *testing.T) { + ex := New(Config{ + Registry: tool.NewRegistry(), + Models: func(ctx context.Context, _ string) (context.Context, llm.Model, error) { + return ctx, nil, nil // nil model, nil error + }, + }) + res := ex.Run(context.Background(), + RunnableAgent{ModelTier: "x"}, tool.Invocation{RunID: "r"}, "hi") + if res.Err == nil { + t.Fatal("expected an error for a nil model, got nil (would have panicked in the loop)") + } +} + +// TestStatusFor maps run errors to RunStats.Status (gadfly F3). +func TestStatusFor(t *testing.T) { + cases := []struct { + err error + want string + }{ + {nil, "ok"}, + {context.DeadlineExceeded, "timeout"}, + {context.Canceled, "cancelled"}, + {fmt.Errorf("wrapped: %w", context.DeadlineExceeded), "timeout"}, + {errors.New("boom"), "error"}, + } + for _, c := range cases { + if got := statusFor(c.err); got != c.want { + t.Errorf("statusFor(%v) = %q, want %q", c.err, got, c.want) + } + } +} diff --git a/run/steps.go b/run/steps.go index 670e09a..16d6d08 100644 --- a/run/steps.go +++ b/run/steps.go @@ -1,11 +1,10 @@ package run // steps.go — the per-run step emitter and the tool→step presentation -// mapping. This is the single place that turns the executor's two loop -// chokepoints (the pre-dispatch tool hook + the post-step observer in -// executor.go) into ordered tool.Step records: one per tool call, -// each with a stable id, an open-vocabulary kind, and a human -// present-tense summary that flips running→complete/error. +// mapping. This is the single place that turns the executor's post-step +// observer into ordered tool.Step records: one per tool call, each with a +// stable id, an open-vocabulary kind, and a human present-tense summary +// that flips running→complete/error. // // One source feeds two consumers (mirroring the OnEvent/OnToolEvent/ // PostRunResult pattern): the live tool.Invocation.OnStep callback @@ -13,6 +12,16 @@ package run // Because the Result accumulation does not depend on OnStep being set, // every surface — chat (JSON + SSE), Discord, cron, sub-agents — carries // steps; OnStep is needed only for live streaming. +// +// LIMITATION (current): majordomo exposes only a POST-step observer, so +// the executor calls toolStart+toolEnd back-to-back after each tool has +// already run. Steps are therefore recorded faithfully, but step.StartedAt +// ≈ EndedAt and the intermediate "running" phase is never observable to a +// live OnStep consumer. A pre-dispatch hook (wrapping each tool's handler +// to emit toolStart before execution, like mort's state-react decorator) +// is a follow-up that would restore real start timing + the running phase. +// The emitter already supports that two-call shape — toolStart and toolEnd +// are separate methods — so wiring it later is additive. import ( "context" @@ -35,8 +44,8 @@ const stepSummaryMaxLen = 200 // stepEmitter accumulates ordered steps for one run and fires the live // OnStep callback. // -// Concurrency: touched ONLY from the agent-loop goroutine. Both call -// sites (the hookToolbox `before` closure and the stepObserver) run +// Concurrency: touched ONLY from the agent-loop goroutine — the executor's +// stepObserver (and, once a pre-dispatch hook is wired, that hook) run // there; majordomo executes a step's tool calls sequentially, and // sub-agents build their own Invocation so they never reach this // emitter. Same single-goroutine contract as the audit Writer and the @@ -46,7 +55,7 @@ type stepEmitter struct { now func() time.Time seq int - steps []tool.Step // ordered; the snapshot for Result.Steps + steps []tool.Step // ordered; the snapshot for Result.Steps byID map[string]int // step id -> index into steps pending map[string][]string // correlation key -> queued running ids (FIFO) } @@ -126,9 +135,12 @@ func (e *stepEmitter) newStep(name string, args json.RawMessage) tool.Step { return step } -// matchPending pops the oldest running step id for (name, args). Falls -// back to the most recent still-running step of the same tool name when -// the args don't byte-match between start and end. Returns "" on no match. +// matchPending pops the oldest running step id for (name, args). Falls back to +// the OLDEST still-running step of the same tool name when the args don't +// byte-match between start and end (e.g. JSON key reordering). FIFO on the +// fallback too, consistent with the per-key queue pop above — closing the +// oldest avoids mis-correlating concurrent same-named calls. Returns "" on no +// match. func (e *stepEmitter) matchPending(name string, args json.RawMessage) string { key := corrKey(name, args) if q := e.pending[key]; len(q) > 0 { @@ -140,7 +152,7 @@ func (e *stepEmitter) matchPending(name string, args json.RawMessage) string { } return id } - for i := len(e.steps) - 1; i >= 0; i-- { + for i := 0; i < len(e.steps); i++ { if e.steps[i].Title == name && e.steps[i].Status == "running" { return e.steps[i].ID }