fix: address verified gadfly P2 findings (9 real of 18)
Independently verified all 18 gadfly findings against the code (18-agent
fan-out). Fixed the 9 real ones; the other 9 were false-positive /
hallucinated / valid-tradeoff (no change).
High:
- F1 nil model: a Models resolver returning (ctx,nil,nil) flowed into the
agent loop and nil-panicked. Now a clean error (Run never panics). +test.
- F9 compactor data-leak: renderTranscript sent tool-call args verbatim to
the summarizer (a possibly-different provider/tier); secret-bearing tool
args (mcp_call/email_send/http_*/webhook_*) are now redacted, with a doc
note that result bodies still flow (summary needs them).
Medium/minor:
- F2 compactor error path returned the folded slice, not the original msgs
(contradicting the documented non-fatal contract) -> return msgs.
- F3 RunStats.Status only ok/error; now timeout (DeadlineExceeded) /
cancelled (Canceled) via statusFor. +test.
- F4 step-zip emitted empty-name "ghost" steps when results>calls; now pairs
min(calls,results) only.
- F5 SetIteration was never called -> RunState.Iteration always 0; the step
observer now updates it each loop.
- F6 matchPending fallback was LIFO; now FIFO (matches the per-key queue).
- F7 estimateTokens had no default arm (future Part kinds counted as 0);
unknown parts now counted conservatively.
- F8 cloud_sync silently truncated >1MiB responses -> opaque JSON error; now
a clear "response exceeded N bytes" via readCapped.
- F12 step observer captured the caller ctx; now the merged runCtx.
- F13 compaction onFire was nil (doc claimed it logged); now wired to
audit LogEvent("compaction_fired").
- F11 (no pre-dispatch hook in majordomo) documented honestly as a known
limitation; F18 UsageSink doc clarified cache tokens are subsets of input.
Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
This commit is contained in:
+42
-4
@@ -180,8 +180,11 @@ func compactIfNeeded(ctx context.Context, cfg CompactorConfig, st *compactionSta
|
|||||||
|
|
||||||
summary, err := summariseMiddle(ctx, cfg, st.summaryText, middle)
|
summary, err := summariseMiddle(ctx, cfg, st.summaryText, middle)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
// Non-fatal upstream: the agent loop sends the original slice.
|
// Non-fatal upstream: the agent loop sends the ORIGINAL slice. Return
|
||||||
return rendered, fmt.Errorf("compactor: summarise middle: %w", err)
|
// msgs, not `rendered` — on a second+ compaction `rendered` already
|
||||||
|
// carries a prior synthetic summary, which is not the documented
|
||||||
|
// "original slice" the loop expects on a compactor error.
|
||||||
|
return msgs, fmt.Errorf("compactor: summarise middle: %w", err)
|
||||||
}
|
}
|
||||||
st.summaryText = summary
|
st.summaryText = summary
|
||||||
st.prefixEnd = endMiddle
|
st.prefixEnd = endMiddle
|
||||||
@@ -285,6 +288,14 @@ func estimateTokens(msgs []llm.Message) int {
|
|||||||
chars += len(v.Text)
|
chars += len(v.Text)
|
||||||
case llm.ImagePart:
|
case llm.ImagePart:
|
||||||
chars += 4096
|
chars += 4096
|
||||||
|
default:
|
||||||
|
// llm.Part is a sealed-but-extensible interface (future media
|
||||||
|
// kinds). Count an unknown part conservatively (like an image)
|
||||||
|
// rather than 0, so a transcript of unrecognised content can't
|
||||||
|
// silently slip under the compaction threshold and 400 the
|
||||||
|
// model. Bump this if a large new part kind lands.
|
||||||
|
_ = v
|
||||||
|
chars += 4096
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
for _, tc := range m.ToolCalls {
|
for _, tc := range m.ToolCalls {
|
||||||
@@ -302,9 +313,36 @@ func estimateTokens(msgs []llm.Message) int {
|
|||||||
// summarizer.
|
// summarizer.
|
||||||
const transcriptMessageCap = 2048
|
const transcriptMessageCap = 2048
|
||||||
|
|
||||||
|
// secretBearingTools name tools whose ARGUMENTS routinely carry credentials or
|
||||||
|
// message bodies (bearer tokens, API keys, recipients, request bodies). Their
|
||||||
|
// args are dropped before the transcript reaches the summarizer model — which
|
||||||
|
// may be a different provider/tier than the run model — mirroring the redaction
|
||||||
|
// run/steps.go applies to user-facing step summaries. http_* and webhook_* are
|
||||||
|
// matched by prefix below.
|
||||||
|
var secretBearingTools = map[string]bool{
|
||||||
|
"mcp_call": true,
|
||||||
|
"email_send": true,
|
||||||
|
}
|
||||||
|
|
||||||
|
// redactToolArgs returns a summariser-safe rendering of a tool call's args:
|
||||||
|
// "[redacted]" for known secret-bearing tools, the args verbatim otherwise.
|
||||||
|
func redactToolArgs(name, args string) string {
|
||||||
|
if secretBearingTools[name] ||
|
||||||
|
strings.HasPrefix(name, "http_") ||
|
||||||
|
strings.HasPrefix(name, "webhook_") {
|
||||||
|
return "[redacted]"
|
||||||
|
}
|
||||||
|
return args
|
||||||
|
}
|
||||||
|
|
||||||
// renderTranscript flattens a message slice to a plain-text transcript
|
// renderTranscript flattens a message slice to a plain-text transcript
|
||||||
// suitable for the summarisation prompt. Tool calls show name + args,
|
// suitable for the summarisation prompt. Tool calls show name + (redacted) args,
|
||||||
// tool results show name + body. Empty fields are skipped.
|
// tool results show name + body. Empty fields are skipped.
|
||||||
|
//
|
||||||
|
// NOTE: tool-RESULT bodies are forwarded to the summarizer (the summary needs
|
||||||
|
// the findings). A host whose tool results may contain secrets and whose
|
||||||
|
// summarizer tier resolves to an untrusted provider should ensure that tier is
|
||||||
|
// trusted, or pre-sanitise results before they reach the agent loop.
|
||||||
func renderTranscript(msgs []llm.Message) string {
|
func renderTranscript(msgs []llm.Message) string {
|
||||||
var sb strings.Builder
|
var sb strings.Builder
|
||||||
for i, m := range msgs {
|
for i, m := range msgs {
|
||||||
@@ -314,7 +352,7 @@ func renderTranscript(msgs []llm.Message) string {
|
|||||||
sb.WriteString("\n")
|
sb.WriteString("\n")
|
||||||
}
|
}
|
||||||
for _, tc := range m.ToolCalls {
|
for _, tc := range m.ToolCalls {
|
||||||
fmt.Fprintf(&sb, "tool_call name=%s args=%s\n", tc.Name, truncate(string(tc.Arguments), transcriptMessageCap))
|
fmt.Fprintf(&sb, "tool_call name=%s args=%s\n", tc.Name, truncate(redactToolArgs(tc.Name, string(tc.Arguments)), transcriptMessageCap))
|
||||||
}
|
}
|
||||||
for _, tr := range m.ToolResults {
|
for _, tr := range m.ToolResults {
|
||||||
fmt.Fprintf(&sb, "tool_result name=%s body=%s\n", tr.Name, truncate(tr.Content, transcriptMessageCap))
|
fmt.Fprintf(&sb, "tool_result name=%s body=%s\n", tr.Name, truncate(tr.Content, transcriptMessageCap))
|
||||||
|
|||||||
+18
-2
@@ -314,7 +314,7 @@ func (c *CloudOllamaLimitCache) fetchTags(ctx context.Context) ([]string, error)
|
|||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
defer resp.Body.Close()
|
defer resp.Body.Close()
|
||||||
body, err := io.ReadAll(io.LimitReader(resp.Body, maxLimitCacheResponseBytes))
|
body, err := readCapped(resp.Body)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
@@ -367,7 +367,7 @@ func (c *CloudOllamaLimitCache) fetchContextLength(ctx context.Context, modelNam
|
|||||||
return 0, err
|
return 0, err
|
||||||
}
|
}
|
||||||
defer resp.Body.Close()
|
defer resp.Body.Close()
|
||||||
respBody, err := io.ReadAll(io.LimitReader(resp.Body, maxLimitCacheResponseBytes))
|
respBody, err := readCapped(resp.Body)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return 0, err
|
return 0, err
|
||||||
}
|
}
|
||||||
@@ -456,3 +456,19 @@ func truncate(b []byte, n int) string {
|
|||||||
// (/api/tags, /api/show) so a misbehaving endpoint can't stream an unbounded
|
// (/api/tags, /api/show) so a misbehaving endpoint can't stream an unbounded
|
||||||
// body before the 15s timeout fires. 1 MiB is far above any real response.
|
// body before the 15s timeout fires. 1 MiB is far above any real response.
|
||||||
const maxLimitCacheResponseBytes = 1 << 20
|
const maxLimitCacheResponseBytes = 1 << 20
|
||||||
|
|
||||||
|
// readCapped reads up to maxLimitCacheResponseBytes from r and returns a clear
|
||||||
|
// error if the response EXCEEDS the cap — rather than silently truncating (as a
|
||||||
|
// bare io.LimitReader does) and letting downstream json.Unmarshal fail with an
|
||||||
|
// opaque "unexpected end of JSON input". It reads one extra byte to detect the
|
||||||
|
// overflow.
|
||||||
|
func readCapped(r io.Reader) ([]byte, error) {
|
||||||
|
body, err := io.ReadAll(io.LimitReader(r, maxLimitCacheResponseBytes+1))
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
if len(body) > maxLimitCacheResponseBytes {
|
||||||
|
return nil, fmt.Errorf("cloud_sync: response exceeded %d bytes", maxLimitCacheResponseBytes)
|
||||||
|
}
|
||||||
|
return body, nil
|
||||||
|
}
|
||||||
|
|||||||
@@ -16,6 +16,11 @@ import (
|
|||||||
// UsageSink receives one record per successful Generate through a model parsed
|
// UsageSink receives one record per successful Generate through a model parsed
|
||||||
// by this package (ParseModelRequest / ParseModelForContext). Implement it to
|
// by this package (ParseModelRequest / ParseModelForContext). Implement it to
|
||||||
// meter or bill; the token detail mirrors majordomo's Response.Usage.
|
// meter or bill; the token detail mirrors majordomo's Response.Usage.
|
||||||
|
//
|
||||||
|
// IMPORTANT: cacheReadTokens and cacheWriteTokens are PORTIONS of inputTokens,
|
||||||
|
// not independent additive values (they let a sink price cached vs fresh input
|
||||||
|
// differently). A sink must NOT compute total = input+output+cacheRead+
|
||||||
|
// cacheWrite — that double-counts the cached input.
|
||||||
type UsageSink interface {
|
type UsageSink interface {
|
||||||
Record(ctx context.Context, model string, inputTokens, outputTokens, cacheReadTokens, cacheWriteTokens int)
|
Record(ctx context.Context, model string, inputTokens, outputTokens, cacheReadTokens, cacheWriteTokens int)
|
||||||
}
|
}
|
||||||
|
|||||||
+68
-24
@@ -2,6 +2,7 @@ package run
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
@@ -130,11 +131,18 @@ func (e *Executor) Run(ctx context.Context, ra RunnableAgent, inv tool.Invocatio
|
|||||||
res.Err = fmt.Errorf("resolve model %q: %w", tier, err)
|
res.Err = fmt.Errorf("resolve model %q: %w", tier, err)
|
||||||
return res
|
return res
|
||||||
}
|
}
|
||||||
|
if model == nil {
|
||||||
|
// A resolver returning (ctx, nil, nil) would otherwise nil-panic inside
|
||||||
|
// the agent loop; surface it as a clean error (Run never panics out).
|
||||||
|
res.Err = fmt.Errorf("resolve model %q: resolver returned a nil model", tier)
|
||||||
|
return res
|
||||||
|
}
|
||||||
ctx = modelCtx
|
ctx = modelCtx
|
||||||
|
|
||||||
// Audit start (optional). The recorder satisfies RunTally; stamp it on the
|
// Audit start (optional). The recorder satisfies RunTally; stamp it on the
|
||||||
// invocation so a self-status tool can read live progress.
|
// invocation so a self-status tool can read live progress.
|
||||||
var rec RunRecorder
|
var rec RunRecorder
|
||||||
|
var stateAcc *RunStateAccessor
|
||||||
if e.cfg.Ports.Audit != nil {
|
if e.cfg.Ports.Audit != nil {
|
||||||
rec = e.cfg.Ports.Audit.StartRun(ctx, RunInfo{
|
rec = e.cfg.Ports.Audit.StartRun(ctx, RunInfo{
|
||||||
RunID: inv.RunID,
|
RunID: inv.RunID,
|
||||||
@@ -148,7 +156,8 @@ func (e *Executor) Run(ctx context.Context, ra RunnableAgent, inv tool.Invocatio
|
|||||||
})
|
})
|
||||||
}
|
}
|
||||||
if rec != nil {
|
if rec != nil {
|
||||||
inv.RunState = NewRunStateAccessor(rec, maxIter, 0, started)
|
stateAcc = NewRunStateAccessor(rec, maxIter, 0, started)
|
||||||
|
inv.RunState = stateAcc
|
||||||
}
|
}
|
||||||
|
|
||||||
// Build the toolbox from the agent's low-level tools.
|
// Build the toolbox from the agent's low-level tools.
|
||||||
@@ -159,11 +168,27 @@ func (e *Executor) Run(ctx context.Context, ra RunnableAgent, inv tool.Invocatio
|
|||||||
return res
|
return res
|
||||||
}
|
}
|
||||||
|
|
||||||
// Step instrumentation: accumulate Result.Steps + fire inv.OnStep, and feed
|
// Run context: bound by MaxRuntime, detached from the caller's deadline so a
|
||||||
// the audit recorder. majordomo's step observer hands us each completed
|
// lane/queue wait doesn't eat the run budget (mort's V10 lesson). Caller
|
||||||
// iteration; we zip the model's tool calls with their executed results.
|
// cancellation still propagates via MergeCancellation. Created BEFORE the
|
||||||
|
// step observer so the observer forwards the merged run context (not a
|
||||||
|
// possibly-cancelled caller ctx) to OnStep consumers.
|
||||||
|
runCtx, cancel := context.WithTimeout(context.WithoutCancel(ctx), maxRuntime)
|
||||||
|
defer cancel()
|
||||||
|
runCtx, mergeCancel := MergeCancellation(runCtx, ctx)
|
||||||
|
defer mergeCancel()
|
||||||
|
|
||||||
|
// Step instrumentation: accumulate Result.Steps + fire inv.OnStep, feed the
|
||||||
|
// audit recorder, and keep the live iteration counter fresh. majordomo's
|
||||||
|
// step observer hands us each completed iteration; we zip the model's tool
|
||||||
|
// calls with their executed results PAIRWISE — a result without a matching
|
||||||
|
// call (or a call without a result) is skipped rather than recorded as an
|
||||||
|
// empty-name "ghost" step.
|
||||||
emitter := newStepEmitter(inv.OnStep)
|
emitter := newStepEmitter(inv.OnStep)
|
||||||
stepObserver := func(s agent.Step) {
|
stepObserver := func(s agent.Step) {
|
||||||
|
if stateAcc != nil {
|
||||||
|
stateAcc.SetIteration(s.Index)
|
||||||
|
}
|
||||||
if rec != nil {
|
if rec != nil {
|
||||||
rec.OnStep(s.Index, s.Response)
|
rec.OnStep(s.Index, s.Response)
|
||||||
}
|
}
|
||||||
@@ -171,27 +196,20 @@ func (e *Executor) Run(ctx context.Context, ra RunnableAgent, inv tool.Invocatio
|
|||||||
if s.Response != nil {
|
if s.Response != nil {
|
||||||
calls = s.Response.ToolCalls
|
calls = s.Response.ToolCalls
|
||||||
}
|
}
|
||||||
for i, r := range s.Results {
|
n := len(s.Results)
|
||||||
var call llm.ToolCall
|
if len(calls) < n {
|
||||||
if i < len(calls) {
|
n = len(calls)
|
||||||
call = calls[i]
|
}
|
||||||
}
|
for i := 0; i < n; i++ {
|
||||||
emitter.toolStart(ctx, call.Name, call.Arguments)
|
call, r := calls[i], s.Results[i]
|
||||||
emitter.toolEnd(ctx, call, r.Content, r.IsError)
|
emitter.toolStart(runCtx, call.Name, call.Arguments)
|
||||||
|
emitter.toolEnd(runCtx, call, r.Content, r.IsError)
|
||||||
if rec != nil {
|
if rec != nil {
|
||||||
rec.OnTool(call, r.Content)
|
rec.OnTool(call, r.Content)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Run context: bound by MaxRuntime, detached from the caller's deadline so a
|
|
||||||
// lane/queue wait doesn't eat the run budget (mort's V10 lesson). Caller
|
|
||||||
// cancellation still propagates via MergeCancellation.
|
|
||||||
runCtx, cancel := context.WithTimeout(context.WithoutCancel(ctx), maxRuntime)
|
|
||||||
defer cancel()
|
|
||||||
runCtx, mergeCancel := MergeCancellation(runCtx, ctx)
|
|
||||||
defer mergeCancel()
|
|
||||||
|
|
||||||
opts := []agent.Option{
|
opts := []agent.Option{
|
||||||
agent.WithToolbox(toolbox),
|
agent.WithToolbox(toolbox),
|
||||||
agent.WithMaxSteps(maxIter),
|
agent.WithMaxSteps(maxIter),
|
||||||
@@ -200,17 +218,27 @@ func (e *Executor) Run(ctx context.Context, ra RunnableAgent, inv tool.Invocatio
|
|||||||
}
|
}
|
||||||
if e.cfg.Compactor != nil && e.cfg.ContextTokens != nil {
|
if e.cfg.Compactor != nil && e.cfg.ContextTokens != nil {
|
||||||
if threshold := e.compactionThreshold(tier); threshold > 0 {
|
if threshold := e.compactionThreshold(tier); threshold > 0 {
|
||||||
opts = append(opts, agent.WithCompactor(e.cfg.Compactor(threshold, nil)))
|
// Forward compaction events to the audit log (makes the
|
||||||
|
// CompactionEvent doc's "logged to the run trace" promise true).
|
||||||
|
var onFire func(compact.CompactionEvent)
|
||||||
|
if rec != nil {
|
||||||
|
onFire = func(ev compact.CompactionEvent) {
|
||||||
|
rec.LogEvent("compaction_fired", map[string]any{
|
||||||
|
"messages_before": ev.MessagesBefore,
|
||||||
|
"messages_after": ev.MessagesAfter,
|
||||||
|
"tokens_before": ev.TokensBefore,
|
||||||
|
"tokens_after": ev.TokensAfter,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
opts = append(opts, agent.WithCompactor(e.cfg.Compactor(threshold, onFire)))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
ag := agent.New(model, e.systemPrompt(ra), opts...)
|
ag := agent.New(model, e.systemPrompt(ra), opts...)
|
||||||
runRes, runErr := ag.Run(runCtx, input)
|
runRes, runErr := ag.Run(runCtx, input)
|
||||||
|
|
||||||
status := "ok"
|
status := statusFor(runErr)
|
||||||
if runErr != nil {
|
|
||||||
status = "error"
|
|
||||||
}
|
|
||||||
if runRes != nil {
|
if runRes != nil {
|
||||||
res.Output = runRes.Output
|
res.Output = runRes.Output
|
||||||
res.Usage = runRes.Usage
|
res.Usage = runRes.Usage
|
||||||
@@ -225,6 +253,22 @@ func (e *Executor) Run(ctx context.Context, ra RunnableAgent, inv tool.Invocatio
|
|||||||
return res
|
return res
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// statusFor maps a run error to a RunStats.Status, distinguishing a deadline
|
||||||
|
// (timeout) and a cancellation (cancelled — caller cancel or shutdown) from a
|
||||||
|
// generic error so audit consumers can tell them apart.
|
||||||
|
func statusFor(runErr error) string {
|
||||||
|
switch {
|
||||||
|
case runErr == nil:
|
||||||
|
return "ok"
|
||||||
|
case errors.Is(runErr, context.DeadlineExceeded):
|
||||||
|
return "timeout"
|
||||||
|
case errors.Is(runErr, context.Canceled):
|
||||||
|
return "cancelled"
|
||||||
|
default:
|
||||||
|
return "error"
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// finishAudit writes the terminal roll-up on a detached context so a cancelled
|
// finishAudit writes the terminal roll-up on a detached context so a cancelled
|
||||||
// run still records (mort's CleanupContextTimeout lesson).
|
// run still records (mort's CleanupContextTimeout lesson).
|
||||||
func (e *Executor) finishAudit(ctx context.Context, rec RunRecorder, status string, res Result, started time.Time, runErr error) {
|
func (e *Executor) finishAudit(ctx context.Context, rec RunRecorder, status string, res Result, started time.Time, runErr error) {
|
||||||
|
|||||||
@@ -3,6 +3,7 @@ package run
|
|||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"errors"
|
"errors"
|
||||||
|
"fmt"
|
||||||
"testing"
|
"testing"
|
||||||
|
|
||||||
"gitea.stevedudenhoeffer.com/steve/majordomo/llm"
|
"gitea.stevedudenhoeffer.com/steve/majordomo/llm"
|
||||||
@@ -130,3 +131,38 @@ func (r *captureRecorder) OnTool(llm.ToolCall, string) { r.tools++ }
|
|||||||
func (r *captureRecorder) LogEvent(string, map[string]any) {}
|
func (r *captureRecorder) LogEvent(string, map[string]any) {}
|
||||||
func (r *captureRecorder) LogError(string) {}
|
func (r *captureRecorder) LogError(string) {}
|
||||||
func (r *captureRecorder) Close(_ context.Context, s RunStats) { r.closed = true; r.stats = s }
|
func (r *captureRecorder) Close(_ context.Context, s RunStats) { r.closed = true; r.stats = s }
|
||||||
|
|
||||||
|
// TestExecutorNilModelNoPanic: a resolver returning (ctx, nil, nil) yields a
|
||||||
|
// clean error, not a nil-pointer panic (gadfly F1, high severity).
|
||||||
|
func TestExecutorNilModelNoPanic(t *testing.T) {
|
||||||
|
ex := New(Config{
|
||||||
|
Registry: tool.NewRegistry(),
|
||||||
|
Models: func(ctx context.Context, _ string) (context.Context, llm.Model, error) {
|
||||||
|
return ctx, nil, nil // nil model, nil error
|
||||||
|
},
|
||||||
|
})
|
||||||
|
res := ex.Run(context.Background(),
|
||||||
|
RunnableAgent{ModelTier: "x"}, tool.Invocation{RunID: "r"}, "hi")
|
||||||
|
if res.Err == nil {
|
||||||
|
t.Fatal("expected an error for a nil model, got nil (would have panicked in the loop)")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// TestStatusFor maps run errors to RunStats.Status (gadfly F3).
|
||||||
|
func TestStatusFor(t *testing.T) {
|
||||||
|
cases := []struct {
|
||||||
|
err error
|
||||||
|
want string
|
||||||
|
}{
|
||||||
|
{nil, "ok"},
|
||||||
|
{context.DeadlineExceeded, "timeout"},
|
||||||
|
{context.Canceled, "cancelled"},
|
||||||
|
{fmt.Errorf("wrapped: %w", context.DeadlineExceeded), "timeout"},
|
||||||
|
{errors.New("boom"), "error"},
|
||||||
|
}
|
||||||
|
for _, c := range cases {
|
||||||
|
if got := statusFor(c.err); got != c.want {
|
||||||
|
t.Errorf("statusFor(%v) = %q, want %q", c.err, got, c.want)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|||||||
+24
-12
@@ -1,11 +1,10 @@
|
|||||||
package run
|
package run
|
||||||
|
|
||||||
// steps.go — the per-run step emitter and the tool→step presentation
|
// steps.go — the per-run step emitter and the tool→step presentation
|
||||||
// mapping. This is the single place that turns the executor's two loop
|
// mapping. This is the single place that turns the executor's post-step
|
||||||
// chokepoints (the pre-dispatch tool hook + the post-step observer in
|
// observer into ordered tool.Step records: one per tool call, each with a
|
||||||
// executor.go) into ordered tool.Step records: one per tool call,
|
// stable id, an open-vocabulary kind, and a human present-tense summary
|
||||||
// each with a stable id, an open-vocabulary kind, and a human
|
// that flips running→complete/error.
|
||||||
// present-tense summary that flips running→complete/error.
|
|
||||||
//
|
//
|
||||||
// One source feeds two consumers (mirroring the OnEvent/OnToolEvent/
|
// One source feeds two consumers (mirroring the OnEvent/OnToolEvent/
|
||||||
// PostRunResult pattern): the live tool.Invocation.OnStep callback
|
// PostRunResult pattern): the live tool.Invocation.OnStep callback
|
||||||
@@ -13,6 +12,16 @@ package run
|
|||||||
// Because the Result accumulation does not depend on OnStep being set,
|
// Because the Result accumulation does not depend on OnStep being set,
|
||||||
// every surface — chat (JSON + SSE), Discord, cron, sub-agents — carries
|
// every surface — chat (JSON + SSE), Discord, cron, sub-agents — carries
|
||||||
// steps; OnStep is needed only for live streaming.
|
// steps; OnStep is needed only for live streaming.
|
||||||
|
//
|
||||||
|
// LIMITATION (current): majordomo exposes only a POST-step observer, so
|
||||||
|
// the executor calls toolStart+toolEnd back-to-back after each tool has
|
||||||
|
// already run. Steps are therefore recorded faithfully, but step.StartedAt
|
||||||
|
// ≈ EndedAt and the intermediate "running" phase is never observable to a
|
||||||
|
// live OnStep consumer. A pre-dispatch hook (wrapping each tool's handler
|
||||||
|
// to emit toolStart before execution, like mort's state-react decorator)
|
||||||
|
// is a follow-up that would restore real start timing + the running phase.
|
||||||
|
// The emitter already supports that two-call shape — toolStart and toolEnd
|
||||||
|
// are separate methods — so wiring it later is additive.
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
@@ -35,8 +44,8 @@ const stepSummaryMaxLen = 200
|
|||||||
// stepEmitter accumulates ordered steps for one run and fires the live
|
// stepEmitter accumulates ordered steps for one run and fires the live
|
||||||
// OnStep callback.
|
// OnStep callback.
|
||||||
//
|
//
|
||||||
// Concurrency: touched ONLY from the agent-loop goroutine. Both call
|
// Concurrency: touched ONLY from the agent-loop goroutine — the executor's
|
||||||
// sites (the hookToolbox `before` closure and the stepObserver) run
|
// stepObserver (and, once a pre-dispatch hook is wired, that hook) run
|
||||||
// there; majordomo executes a step's tool calls sequentially, and
|
// there; majordomo executes a step's tool calls sequentially, and
|
||||||
// sub-agents build their own Invocation so they never reach this
|
// sub-agents build their own Invocation so they never reach this
|
||||||
// emitter. Same single-goroutine contract as the audit Writer and the
|
// emitter. Same single-goroutine contract as the audit Writer and the
|
||||||
@@ -46,7 +55,7 @@ type stepEmitter struct {
|
|||||||
now func() time.Time
|
now func() time.Time
|
||||||
|
|
||||||
seq int
|
seq int
|
||||||
steps []tool.Step // ordered; the snapshot for Result.Steps
|
steps []tool.Step // ordered; the snapshot for Result.Steps
|
||||||
byID map[string]int // step id -> index into steps
|
byID map[string]int // step id -> index into steps
|
||||||
pending map[string][]string // correlation key -> queued running ids (FIFO)
|
pending map[string][]string // correlation key -> queued running ids (FIFO)
|
||||||
}
|
}
|
||||||
@@ -126,9 +135,12 @@ func (e *stepEmitter) newStep(name string, args json.RawMessage) tool.Step {
|
|||||||
return step
|
return step
|
||||||
}
|
}
|
||||||
|
|
||||||
// matchPending pops the oldest running step id for (name, args). Falls
|
// matchPending pops the oldest running step id for (name, args). Falls back to
|
||||||
// back to the most recent still-running step of the same tool name when
|
// the OLDEST still-running step of the same tool name when the args don't
|
||||||
// the args don't byte-match between start and end. Returns "" on no match.
|
// byte-match between start and end (e.g. JSON key reordering). FIFO on the
|
||||||
|
// fallback too, consistent with the per-key queue pop above — closing the
|
||||||
|
// oldest avoids mis-correlating concurrent same-named calls. Returns "" on no
|
||||||
|
// match.
|
||||||
func (e *stepEmitter) matchPending(name string, args json.RawMessage) string {
|
func (e *stepEmitter) matchPending(name string, args json.RawMessage) string {
|
||||||
key := corrKey(name, args)
|
key := corrKey(name, args)
|
||||||
if q := e.pending[key]; len(q) > 0 {
|
if q := e.pending[key]; len(q) > 0 {
|
||||||
@@ -140,7 +152,7 @@ func (e *stepEmitter) matchPending(name string, args json.RawMessage) string {
|
|||||||
}
|
}
|
||||||
return id
|
return id
|
||||||
}
|
}
|
||||||
for i := len(e.steps) - 1; i >= 0; i-- {
|
for i := 0; i < len(e.steps); i++ {
|
||||||
if e.steps[i].Title == name && e.steps[i].Status == "running" {
|
if e.steps[i].Title == name && e.steps[i].Status == "running" {
|
||||||
return e.steps[i].ID
|
return e.steps[i].ID
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user