Files
executus/compact/compactor.go
T
steve 7b3da87c08 fix: address verified gadfly P2 findings (9 real of 18)
Independently verified all 18 gadfly findings against the code (18-agent
fan-out). Fixed the 9 real ones; the other 9 were false-positive /
hallucinated / valid-tradeoff (no change).

High:
- F1 nil model: a Models resolver returning (ctx,nil,nil) flowed into the
  agent loop and nil-panicked. Now a clean error (Run never panics). +test.
- F9 compactor data-leak: renderTranscript sent tool-call args verbatim to
  the summarizer (a possibly-different provider/tier); secret-bearing tool
  args (mcp_call/email_send/http_*/webhook_*) are now redacted, with a doc
  note that result bodies still flow (summary needs them).

Medium/minor:
- F2 compactor error path returned the folded slice, not the original msgs
  (contradicting the documented non-fatal contract) -> return msgs.
- F3 RunStats.Status only ok/error; now timeout (DeadlineExceeded) /
  cancelled (Canceled) via statusFor. +test.
- F4 step-zip emitted empty-name "ghost" steps when results>calls; now pairs
  min(calls,results) only.
- F5 SetIteration was never called -> RunState.Iteration always 0; the step
  observer now updates it each loop.
- F6 matchPending fallback was LIFO; now FIFO (matches the per-key queue).
- F7 estimateTokens had no default arm (future Part kinds counted as 0);
  unknown parts now counted conservatively.
- F8 cloud_sync silently truncated >1MiB responses -> opaque JSON error; now
  a clear "response exceeded N bytes" via readCapped.
- F12 step observer captured the caller ctx; now the merged runCtx.
- F13 compaction onFire was nil (doc claimed it logged); now wired to
  audit LogEvent("compaction_fired").
- F11 (no pre-dispatch hook in majordomo) documented honestly as a known
  limitation; F18 UsageSink doc clarified cache tokens are subsets of input.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-27 02:02:21 +00:00

370 lines
14 KiB
Go

// V15.2 context compactor (re-based on majordomo).
//
// Why: the agent loop accumulates tool results indefinitely. A
// research-heavy run with many web_search / read_page / http_get
// results easily crosses 200K tokens and trips the model's HTTP-400
// "prompt too long" rejection mid-run (observed at 410K tokens on
// qwen3-coder:480b which has a 262K cap). majordomo's agent loop calls
// the compactor with the full message slice before every model call;
// the compactor returns a shorter slice that preserves the system
// prompt + recent messages, with the middle range replaced by a
// synthetic summary.
//
// Strategy (unchanged from the agentkit era):
// - Keep any leading system message verbatim. (Under majordomo the
// system prompt normally travels in Request.System, not in the
// message slice, so this is defensive.)
// - Keep the last KeepRecent messages verbatim. This ensures the
// agent has fresh tool state to act on; compacting too aggressively
// would strip the in-flight context it needs to make the next
// decision.
// - Compress the middle range via a single fast-tier LLM call that
// receives the middle messages as raw text and produces a paragraph
// summary (URLs visited, key findings, file_ids created, what the
// agent is trying to accomplish).
// - Replace the middle range with one synthetic user-role message
// containing the summary. (user-role chosen because tool-result-role
// would be ambiguous without a matching tool_call_id.)
//
// What moved in the majordomo conversion:
// - The token-threshold gate lives HERE now. agentkit estimated
// tokens and only invoked the compactor past a configured
// threshold; majordomo's hook fires before every model call, so
// the threshold check (estimateTokens vs the per-run threshold the
// executor computes from the model's context limit) is the
// compactor's first step.
// - The compactor is per-run STATEFUL: majordomo does not replace
// the loop's internal transcript with the compacted slice (the
// hook shapes only what is SENT), so without memory the middle
// would be re-summarised from scratch on every step past the
// threshold. The state remembers how far the transcript has been
// folded into the running summary and folds the previous summary
// into the next one instead of re-paying for it.
//
// Failure path: any error (LLM unavailable, malformed response, etc.)
// is returned to the agent loop, which treats compactor errors as
// non-fatal and sends the original slice — if the next model call hits
// the provider's limit, the existing HTTP-400 error path takes over.
package compact
import (
"context"
"errors"
"fmt"
"strings"
"sync"
"gitea.stevedudenhoeffer.com/steve/majordomo/llm"
)
// ModelResolver resolves a tier/spec to a usable llm.Model (and an enriched
// context for usage attribution). model.ParseModelForContext satisfies it.
type ModelResolver func(ctx context.Context, tier string) (context.Context, llm.Model, error)
// Compactor is the per-run compaction hook handed to the agent loop
// (matches the signature agent.WithCompactor expects).
type Compactor = func(ctx context.Context, msgs []llm.Message) ([]llm.Message, error)
// CompactionEvent describes one fired compaction so the executor can
// log it to skill_run_logs ("compaction_fired").
type CompactionEvent struct {
// MessagesBefore/After count the messages that would have been
// sent without/with this compaction.
MessagesBefore int
MessagesAfter int
// TokensBefore/After are the estimateTokens values for the same
// two slices.
TokensBefore int
TokensAfter int
}
// CompactorFactory mints a fresh per-run Compactor bound to a token
// threshold. onFire (nil-safe) observes every compaction that actually
// fires. A non-positive threshold yields a pass-through compactor.
type CompactorFactory func(thresholdTokens int, onFire func(CompactionEvent)) Compactor
// CompactorConfig controls the v15.2 compactor's behaviour. Construct
// in mort.go from convars + the executor's ModelResolver.
type CompactorConfig struct {
// Models resolves a model spec (typically "fast" or a specific tier)
// to an llm.Model. Required; nil disables compaction.
Models ModelResolver
// SummarizerTier is the model tier used for the compression LLM call.
// Production default "fast"; an admin may set "haiku" or a specific
// model spec. Empty falls back to "fast".
SummarizerTier string
// KeepRecent is the number of trailing messages preserved verbatim.
// Default 8. Lower values compact more aggressively (the next loop
// iteration sees less recent context); higher values keep more.
KeepRecent int
// SummaryWordCap bounds the LLM-generated summary length. Default
// 200 words ≈ 800 chars ≈ 200 tokens — small enough that the
// compaction always shrinks the slice meaningfully.
SummaryWordCap int
}
// NewCompactor returns a CompactorFactory implementing the middle-range
// summarisation strategy. nil cfg.Models returns a factory of no-op
// compactors that always return the input unchanged (degrades to
// v15.1 behaviour).
func NewCompactor(cfg CompactorConfig) CompactorFactory {
if cfg.Models == nil {
return func(int, func(CompactionEvent)) Compactor {
return func(_ context.Context, msgs []llm.Message) ([]llm.Message, error) {
return msgs, nil
}
}
}
if cfg.SummarizerTier == "" {
cfg.SummarizerTier = "fast"
}
if cfg.KeepRecent <= 0 {
cfg.KeepRecent = 8
}
if cfg.SummaryWordCap <= 0 {
cfg.SummaryWordCap = 200
}
return func(threshold int, onFire func(CompactionEvent)) Compactor {
st := &compactionState{}
return func(ctx context.Context, msgs []llm.Message) ([]llm.Message, error) {
return compactIfNeeded(ctx, cfg, st, threshold, onFire, msgs)
}
}
}
// compactionState is the per-run fold memory: msgs[:prefixEnd]
// (excluding a leading system message) are represented by summaryText
// in the rendered slice. Guarded by mu for safety although the agent
// loop invokes the hook from a single goroutine.
type compactionState struct {
mu sync.Mutex
prefixEnd int
summaryText string
}
// compactIfNeeded is the workhorse: render the transcript with any
// existing fold applied, check the threshold, and fold more of the
// middle into the summary when the rendered size still exceeds it.
func compactIfNeeded(ctx context.Context, cfg CompactorConfig, st *compactionState,
threshold int, onFire func(CompactionEvent), msgs []llm.Message) ([]llm.Message, error) {
st.mu.Lock()
defer st.mu.Unlock()
rendered := renderCompacted(st, msgs)
if threshold <= 0 {
return rendered, nil
}
tokensBefore := estimateTokens(rendered)
if tokensBefore < threshold {
return rendered, nil
}
// Determine the new middle range to fold: everything between what
// is already summarised (or the optional leading system message)
// and the KeepRecent tail.
startMiddle := st.prefixEnd
if startMiddle == 0 && len(msgs) > 0 && msgs[0].Role == llm.RoleSystem {
startMiddle = 1
}
endMiddle := len(msgs) - cfg.KeepRecent
if endMiddle <= startMiddle {
// Nothing new to fold (the tail alone exceeds the threshold).
// Return the rendered slice; the model call may still succeed.
return rendered, nil
}
middle := msgs[startMiddle:endMiddle]
summary, err := summariseMiddle(ctx, cfg, st.summaryText, middle)
if err != nil {
// Non-fatal upstream: the agent loop sends the ORIGINAL slice. Return
// msgs, not `rendered` — on a second+ compaction `rendered` already
// carries a prior synthetic summary, which is not the documented
// "original slice" the loop expects on a compactor error.
return msgs, fmt.Errorf("compactor: summarise middle: %w", err)
}
st.summaryText = summary
st.prefixEnd = endMiddle
out := renderCompacted(st, msgs)
if onFire != nil {
onFire(CompactionEvent{
MessagesBefore: len(rendered),
MessagesAfter: len(out),
TokensBefore: tokensBefore,
TokensAfter: estimateTokens(out),
})
}
return out, nil
}
// renderCompacted applies the fold state to msgs: [optional system] +
// [synthetic summary] + msgs[prefixEnd:]. With no fold yet, msgs is
// returned unchanged.
func renderCompacted(st *compactionState, msgs []llm.Message) []llm.Message {
if st.prefixEnd <= 0 || st.prefixEnd > len(msgs) {
return msgs
}
tail := msgs[st.prefixEnd:]
out := make([]llm.Message, 0, len(tail)+2)
if msgs[0].Role == llm.RoleSystem {
out = append(out, msgs[0])
}
out = append(out, llm.UserText(
"[CONTEXT COMPACTED] The earlier portion of this conversation was summarised "+
"to fit the model's context window. Summary:\n\n"+st.summaryText+
"\n\nResume from the recent messages below."))
out = append(out, tail...)
return out
}
// summariseMiddle composes a "compress this transcript" prompt and
// fires one fast-tier LLM call. prevSummary (may be empty) is the
// running summary from earlier compactions; it is folded into the new
// summary so prior context is not lost.
func summariseMiddle(ctx context.Context, cfg CompactorConfig, prevSummary string, middle []llm.Message) (string, error) {
if len(middle) == 0 {
return "", errors.New("compactor: empty middle range")
}
modelCtx, model, err := cfg.Models(ctx, cfg.SummarizerTier)
if err != nil {
return "", fmt.Errorf("compactor: resolve summarizer model %q: %w", cfg.SummarizerTier, err)
}
if model == nil {
return "", errors.New("compactor: summarizer model resolved to nil")
}
if modelCtx != nil {
ctx = modelCtx
}
var prior string
if strings.TrimSpace(prevSummary) != "" {
prior = "AN EARLIER PORTION WAS ALREADY SUMMARISED AS:\n" + prevSummary +
"\n\nFold that summary into your new one — its facts must survive.\n\n"
}
transcript := renderTranscript(middle)
prompt := fmt.Sprintf(
"You are compressing an in-flight agent's conversation transcript so the agent "+
"can continue working without blowing its model context. The transcript below is "+
"a sequence of tool calls and their results. Produce a single paragraph (under %d words) "+
"that captures:\n"+
" - WHAT the agent has been trying to accomplish.\n"+
" - WHICH URLs were visited / fetched (list inline, comma-separated).\n"+
" - KEY findings or decisions (factual results the agent will need later).\n"+
" - ANY file_ids or KV keys the agent created — these are persistent state references the agent must keep.\n"+
" - ANY errors or dead-ends that the agent should not re-try.\n"+
"DO NOT include verbose HTTP headers, tool-call metadata, error stack traces, or repetitive content. "+
"DO NOT add commentary or markdown headers. Output prose only.\n\n"+
"%sTRANSCRIPT TO COMPRESS:\n%s",
cfg.SummaryWordCap,
prior,
transcript,
)
resp, err := model.Generate(ctx, llm.Request{Messages: []llm.Message{llm.UserText(prompt)}})
if err != nil {
return "", fmt.Errorf("compactor: summarise LLM call: %w", err)
}
text := strings.TrimSpace(resp.Text())
if text == "" {
return "", errors.New("compactor: summarizer returned empty text")
}
return text, nil
}
// estimateTokens is the chars/4 heuristic over a message slice's text,
// tool calls, and tool results. Images count a flat ~1K tokens each.
// It intentionally matches the coarse estimator the old agentkit loop
// used — the 0.7 threshold ratio provides the safety margin.
func estimateTokens(msgs []llm.Message) int {
chars := 0
for _, m := range msgs {
for _, p := range m.Parts {
switch v := p.(type) {
case llm.TextPart:
chars += len(v.Text)
case llm.ImagePart:
chars += 4096
default:
// llm.Part is a sealed-but-extensible interface (future media
// kinds). Count an unknown part conservatively (like an image)
// rather than 0, so a transcript of unrecognised content can't
// silently slip under the compaction threshold and 400 the
// model. Bump this if a large new part kind lands.
_ = v
chars += 4096
}
}
for _, tc := range m.ToolCalls {
chars += len(tc.Name) + len(tc.Arguments)
}
for _, tr := range m.ToolResults {
chars += len(tr.Content)
}
}
return chars / 4
}
// transcriptMessageCap bounds individual message bodies at ~2KB so a
// single ultra-long tool result can't dominate the prompt sent to the
// summarizer.
const transcriptMessageCap = 2048
// secretBearingTools name tools whose ARGUMENTS routinely carry credentials or
// message bodies (bearer tokens, API keys, recipients, request bodies). Their
// args are dropped before the transcript reaches the summarizer model — which
// may be a different provider/tier than the run model — mirroring the redaction
// run/steps.go applies to user-facing step summaries. http_* and webhook_* are
// matched by prefix below.
var secretBearingTools = map[string]bool{
"mcp_call": true,
"email_send": true,
}
// redactToolArgs returns a summariser-safe rendering of a tool call's args:
// "[redacted]" for known secret-bearing tools, the args verbatim otherwise.
func redactToolArgs(name, args string) string {
if secretBearingTools[name] ||
strings.HasPrefix(name, "http_") ||
strings.HasPrefix(name, "webhook_") {
return "[redacted]"
}
return args
}
// renderTranscript flattens a message slice to a plain-text transcript
// suitable for the summarisation prompt. Tool calls show name + (redacted) args,
// tool results show name + body. Empty fields are skipped.
//
// NOTE: tool-RESULT bodies are forwarded to the summarizer (the summary needs
// the findings). A host whose tool results may contain secrets and whose
// summarizer tier resolves to an untrusted provider should ensure that tier is
// trusted, or pre-sanitise results before they reach the agent loop.
func renderTranscript(msgs []llm.Message) string {
var sb strings.Builder
for i, m := range msgs {
fmt.Fprintf(&sb, "---\n[%d] role=%s\n", i+1, m.Role)
if text := m.Text(); text != "" {
sb.WriteString(truncate(text, transcriptMessageCap))
sb.WriteString("\n")
}
for _, tc := range m.ToolCalls {
fmt.Fprintf(&sb, "tool_call name=%s args=%s\n", tc.Name, truncate(redactToolArgs(tc.Name, string(tc.Arguments)), transcriptMessageCap))
}
for _, tr := range m.ToolResults {
fmt.Fprintf(&sb, "tool_result name=%s body=%s\n", tr.Name, truncate(tr.Content, transcriptMessageCap))
}
}
return sb.String()
}
func truncate(s string, n int) string {
if len(s) <= n {
return s
}
return s[:n] + "...(truncated)"
}