P2: move compactor -> compact/ + step instrumentation -> run/steps.go
- compact/compactor.go: the per-run stateful context compactor (token-threshold gate, fast-tier middle summarisation, fold memory) lifted from mort's skillexec/compactor.go. Self-contained; its only dependency is a ModelResolver func (model.ParseModelForContext satisfies it) + a token threshold. - run/steps.go: the step-emission/instrumentation (stepEmitter, tool->kind/ summary mapping with redaction, Result.Steps accumulation) from agentexec, repointed onto executus/tool. Both build green. With the run-loop mechanics, RunnableAgent DTO, run.Ports, compactor, and step instrumentation now all in place, the remaining P2 work is the run.Executor itself (wiring these + majordomo's agent loop), which makes executus runnable. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
This commit is contained in:
@@ -0,0 +1,331 @@
|
||||
// V15.2 context compactor (re-based on majordomo).
|
||||
//
|
||||
// Why: the agent loop accumulates tool results indefinitely. A
|
||||
// research-heavy run with many web_search / read_page / http_get
|
||||
// results easily crosses 200K tokens and trips the model's HTTP-400
|
||||
// "prompt too long" rejection mid-run (observed at 410K tokens on
|
||||
// qwen3-coder:480b which has a 262K cap). majordomo's agent loop calls
|
||||
// the compactor with the full message slice before every model call;
|
||||
// the compactor returns a shorter slice that preserves the system
|
||||
// prompt + recent messages, with the middle range replaced by a
|
||||
// synthetic summary.
|
||||
//
|
||||
// Strategy (unchanged from the agentkit era):
|
||||
// - Keep any leading system message verbatim. (Under majordomo the
|
||||
// system prompt normally travels in Request.System, not in the
|
||||
// message slice, so this is defensive.)
|
||||
// - Keep the last KeepRecent messages verbatim. This ensures the
|
||||
// agent has fresh tool state to act on; compacting too aggressively
|
||||
// would strip the in-flight context it needs to make the next
|
||||
// decision.
|
||||
// - Compress the middle range via a single fast-tier LLM call that
|
||||
// receives the middle messages as raw text and produces a paragraph
|
||||
// summary (URLs visited, key findings, file_ids created, what the
|
||||
// agent is trying to accomplish).
|
||||
// - Replace the middle range with one synthetic user-role message
|
||||
// containing the summary. (user-role chosen because tool-result-role
|
||||
// would be ambiguous without a matching tool_call_id.)
|
||||
//
|
||||
// What moved in the majordomo conversion:
|
||||
// - The token-threshold gate lives HERE now. agentkit estimated
|
||||
// tokens and only invoked the compactor past a configured
|
||||
// threshold; majordomo's hook fires before every model call, so
|
||||
// the threshold check (estimateTokens vs the per-run threshold the
|
||||
// executor computes from the model's context limit) is the
|
||||
// compactor's first step.
|
||||
// - The compactor is per-run STATEFUL: majordomo does not replace
|
||||
// the loop's internal transcript with the compacted slice (the
|
||||
// hook shapes only what is SENT), so without memory the middle
|
||||
// would be re-summarised from scratch on every step past the
|
||||
// threshold. The state remembers how far the transcript has been
|
||||
// folded into the running summary and folds the previous summary
|
||||
// into the next one instead of re-paying for it.
|
||||
//
|
||||
// Failure path: any error (LLM unavailable, malformed response, etc.)
|
||||
// is returned to the agent loop, which treats compactor errors as
|
||||
// non-fatal and sends the original slice — if the next model call hits
|
||||
// the provider's limit, the existing HTTP-400 error path takes over.
|
||||
|
||||
package compact
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"strings"
|
||||
"sync"
|
||||
|
||||
"gitea.stevedudenhoeffer.com/steve/majordomo/llm"
|
||||
)
|
||||
|
||||
// ModelResolver resolves a tier/spec to a usable llm.Model (and an enriched
|
||||
// context for usage attribution). model.ParseModelForContext satisfies it.
|
||||
type ModelResolver func(ctx context.Context, tier string) (context.Context, llm.Model, error)
|
||||
|
||||
// Compactor is the per-run compaction hook handed to the agent loop
|
||||
// (matches the signature agent.WithCompactor expects).
|
||||
type Compactor = func(ctx context.Context, msgs []llm.Message) ([]llm.Message, error)
|
||||
|
||||
// CompactionEvent describes one fired compaction so the executor can
|
||||
// log it to skill_run_logs ("compaction_fired").
|
||||
type CompactionEvent struct {
|
||||
// MessagesBefore/After count the messages that would have been
|
||||
// sent without/with this compaction.
|
||||
MessagesBefore int
|
||||
MessagesAfter int
|
||||
// TokensBefore/After are the estimateTokens values for the same
|
||||
// two slices.
|
||||
TokensBefore int
|
||||
TokensAfter int
|
||||
}
|
||||
|
||||
// CompactorFactory mints a fresh per-run Compactor bound to a token
|
||||
// threshold. onFire (nil-safe) observes every compaction that actually
|
||||
// fires. A non-positive threshold yields a pass-through compactor.
|
||||
type CompactorFactory func(thresholdTokens int, onFire func(CompactionEvent)) Compactor
|
||||
|
||||
// CompactorConfig controls the v15.2 compactor's behaviour. Construct
|
||||
// in mort.go from convars + the executor's ModelResolver.
|
||||
type CompactorConfig struct {
|
||||
// Models resolves a model spec (typically "fast" or a specific tier)
|
||||
// to an llm.Model. Required; nil disables compaction.
|
||||
Models ModelResolver
|
||||
|
||||
// SummarizerTier is the model tier used for the compression LLM call.
|
||||
// Production default "fast"; an admin may set "haiku" or a specific
|
||||
// model spec. Empty falls back to "fast".
|
||||
SummarizerTier string
|
||||
|
||||
// KeepRecent is the number of trailing messages preserved verbatim.
|
||||
// Default 8. Lower values compact more aggressively (the next loop
|
||||
// iteration sees less recent context); higher values keep more.
|
||||
KeepRecent int
|
||||
|
||||
// SummaryWordCap bounds the LLM-generated summary length. Default
|
||||
// 200 words ≈ 800 chars ≈ 200 tokens — small enough that the
|
||||
// compaction always shrinks the slice meaningfully.
|
||||
SummaryWordCap int
|
||||
}
|
||||
|
||||
// NewCompactor returns a CompactorFactory implementing the middle-range
|
||||
// summarisation strategy. nil cfg.Models returns a factory of no-op
|
||||
// compactors that always return the input unchanged (degrades to
|
||||
// v15.1 behaviour).
|
||||
func NewCompactor(cfg CompactorConfig) CompactorFactory {
|
||||
if cfg.Models == nil {
|
||||
return func(int, func(CompactionEvent)) Compactor {
|
||||
return func(_ context.Context, msgs []llm.Message) ([]llm.Message, error) {
|
||||
return msgs, nil
|
||||
}
|
||||
}
|
||||
}
|
||||
if cfg.SummarizerTier == "" {
|
||||
cfg.SummarizerTier = "fast"
|
||||
}
|
||||
if cfg.KeepRecent <= 0 {
|
||||
cfg.KeepRecent = 8
|
||||
}
|
||||
if cfg.SummaryWordCap <= 0 {
|
||||
cfg.SummaryWordCap = 200
|
||||
}
|
||||
return func(threshold int, onFire func(CompactionEvent)) Compactor {
|
||||
st := &compactionState{}
|
||||
return func(ctx context.Context, msgs []llm.Message) ([]llm.Message, error) {
|
||||
return compactIfNeeded(ctx, cfg, st, threshold, onFire, msgs)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// compactionState is the per-run fold memory: msgs[:prefixEnd]
|
||||
// (excluding a leading system message) are represented by summaryText
|
||||
// in the rendered slice. Guarded by mu for safety although the agent
|
||||
// loop invokes the hook from a single goroutine.
|
||||
type compactionState struct {
|
||||
mu sync.Mutex
|
||||
prefixEnd int
|
||||
summaryText string
|
||||
}
|
||||
|
||||
// compactIfNeeded is the workhorse: render the transcript with any
|
||||
// existing fold applied, check the threshold, and fold more of the
|
||||
// middle into the summary when the rendered size still exceeds it.
|
||||
func compactIfNeeded(ctx context.Context, cfg CompactorConfig, st *compactionState,
|
||||
threshold int, onFire func(CompactionEvent), msgs []llm.Message) ([]llm.Message, error) {
|
||||
st.mu.Lock()
|
||||
defer st.mu.Unlock()
|
||||
|
||||
rendered := renderCompacted(st, msgs)
|
||||
if threshold <= 0 {
|
||||
return rendered, nil
|
||||
}
|
||||
tokensBefore := estimateTokens(rendered)
|
||||
if tokensBefore < threshold {
|
||||
return rendered, nil
|
||||
}
|
||||
|
||||
// Determine the new middle range to fold: everything between what
|
||||
// is already summarised (or the optional leading system message)
|
||||
// and the KeepRecent tail.
|
||||
startMiddle := st.prefixEnd
|
||||
if startMiddle == 0 && len(msgs) > 0 && msgs[0].Role == llm.RoleSystem {
|
||||
startMiddle = 1
|
||||
}
|
||||
endMiddle := len(msgs) - cfg.KeepRecent
|
||||
if endMiddle <= startMiddle {
|
||||
// Nothing new to fold (the tail alone exceeds the threshold).
|
||||
// Return the rendered slice; the model call may still succeed.
|
||||
return rendered, nil
|
||||
}
|
||||
middle := msgs[startMiddle:endMiddle]
|
||||
|
||||
summary, err := summariseMiddle(ctx, cfg, st.summaryText, middle)
|
||||
if err != nil {
|
||||
// Non-fatal upstream: the agent loop sends the original slice.
|
||||
return rendered, fmt.Errorf("compactor: summarise middle: %w", err)
|
||||
}
|
||||
st.summaryText = summary
|
||||
st.prefixEnd = endMiddle
|
||||
|
||||
out := renderCompacted(st, msgs)
|
||||
if onFire != nil {
|
||||
onFire(CompactionEvent{
|
||||
MessagesBefore: len(rendered),
|
||||
MessagesAfter: len(out),
|
||||
TokensBefore: tokensBefore,
|
||||
TokensAfter: estimateTokens(out),
|
||||
})
|
||||
}
|
||||
return out, nil
|
||||
}
|
||||
|
||||
// renderCompacted applies the fold state to msgs: [optional system] +
|
||||
// [synthetic summary] + msgs[prefixEnd:]. With no fold yet, msgs is
|
||||
// returned unchanged.
|
||||
func renderCompacted(st *compactionState, msgs []llm.Message) []llm.Message {
|
||||
if st.prefixEnd <= 0 || st.prefixEnd > len(msgs) {
|
||||
return msgs
|
||||
}
|
||||
tail := msgs[st.prefixEnd:]
|
||||
out := make([]llm.Message, 0, len(tail)+2)
|
||||
if msgs[0].Role == llm.RoleSystem {
|
||||
out = append(out, msgs[0])
|
||||
}
|
||||
out = append(out, llm.UserText(
|
||||
"[CONTEXT COMPACTED] The earlier portion of this conversation was summarised "+
|
||||
"to fit the model's context window. Summary:\n\n"+st.summaryText+
|
||||
"\n\nResume from the recent messages below."))
|
||||
out = append(out, tail...)
|
||||
return out
|
||||
}
|
||||
|
||||
// summariseMiddle composes a "compress this transcript" prompt and
|
||||
// fires one fast-tier LLM call. prevSummary (may be empty) is the
|
||||
// running summary from earlier compactions; it is folded into the new
|
||||
// summary so prior context is not lost.
|
||||
func summariseMiddle(ctx context.Context, cfg CompactorConfig, prevSummary string, middle []llm.Message) (string, error) {
|
||||
if len(middle) == 0 {
|
||||
return "", errors.New("compactor: empty middle range")
|
||||
}
|
||||
modelCtx, model, err := cfg.Models(ctx, cfg.SummarizerTier)
|
||||
if err != nil {
|
||||
return "", fmt.Errorf("compactor: resolve summarizer model %q: %w", cfg.SummarizerTier, err)
|
||||
}
|
||||
if model == nil {
|
||||
return "", errors.New("compactor: summarizer model resolved to nil")
|
||||
}
|
||||
if modelCtx != nil {
|
||||
ctx = modelCtx
|
||||
}
|
||||
|
||||
var prior string
|
||||
if strings.TrimSpace(prevSummary) != "" {
|
||||
prior = "AN EARLIER PORTION WAS ALREADY SUMMARISED AS:\n" + prevSummary +
|
||||
"\n\nFold that summary into your new one — its facts must survive.\n\n"
|
||||
}
|
||||
transcript := renderTranscript(middle)
|
||||
prompt := fmt.Sprintf(
|
||||
"You are compressing an in-flight agent's conversation transcript so the agent "+
|
||||
"can continue working without blowing its model context. The transcript below is "+
|
||||
"a sequence of tool calls and their results. Produce a single paragraph (under %d words) "+
|
||||
"that captures:\n"+
|
||||
" - WHAT the agent has been trying to accomplish.\n"+
|
||||
" - WHICH URLs were visited / fetched (list inline, comma-separated).\n"+
|
||||
" - KEY findings or decisions (factual results the agent will need later).\n"+
|
||||
" - ANY file_ids or KV keys the agent created — these are persistent state references the agent must keep.\n"+
|
||||
" - ANY errors or dead-ends that the agent should not re-try.\n"+
|
||||
"DO NOT include verbose HTTP headers, tool-call metadata, error stack traces, or repetitive content. "+
|
||||
"DO NOT add commentary or markdown headers. Output prose only.\n\n"+
|
||||
"%sTRANSCRIPT TO COMPRESS:\n%s",
|
||||
cfg.SummaryWordCap,
|
||||
prior,
|
||||
transcript,
|
||||
)
|
||||
|
||||
resp, err := model.Generate(ctx, llm.Request{Messages: []llm.Message{llm.UserText(prompt)}})
|
||||
if err != nil {
|
||||
return "", fmt.Errorf("compactor: summarise LLM call: %w", err)
|
||||
}
|
||||
text := strings.TrimSpace(resp.Text())
|
||||
if text == "" {
|
||||
return "", errors.New("compactor: summarizer returned empty text")
|
||||
}
|
||||
return text, nil
|
||||
}
|
||||
|
||||
// estimateTokens is the chars/4 heuristic over a message slice's text,
|
||||
// tool calls, and tool results. Images count a flat ~1K tokens each.
|
||||
// It intentionally matches the coarse estimator the old agentkit loop
|
||||
// used — the 0.7 threshold ratio provides the safety margin.
|
||||
func estimateTokens(msgs []llm.Message) int {
|
||||
chars := 0
|
||||
for _, m := range msgs {
|
||||
for _, p := range m.Parts {
|
||||
switch v := p.(type) {
|
||||
case llm.TextPart:
|
||||
chars += len(v.Text)
|
||||
case llm.ImagePart:
|
||||
chars += 4096
|
||||
}
|
||||
}
|
||||
for _, tc := range m.ToolCalls {
|
||||
chars += len(tc.Name) + len(tc.Arguments)
|
||||
}
|
||||
for _, tr := range m.ToolResults {
|
||||
chars += len(tr.Content)
|
||||
}
|
||||
}
|
||||
return chars / 4
|
||||
}
|
||||
|
||||
// transcriptMessageCap bounds individual message bodies at ~2KB so a
|
||||
// single ultra-long tool result can't dominate the prompt sent to the
|
||||
// summarizer.
|
||||
const transcriptMessageCap = 2048
|
||||
|
||||
// renderTranscript flattens a message slice to a plain-text transcript
|
||||
// suitable for the summarisation prompt. Tool calls show name + args,
|
||||
// tool results show name + body. Empty fields are skipped.
|
||||
func renderTranscript(msgs []llm.Message) string {
|
||||
var sb strings.Builder
|
||||
for i, m := range msgs {
|
||||
fmt.Fprintf(&sb, "---\n[%d] role=%s\n", i+1, m.Role)
|
||||
if text := m.Text(); text != "" {
|
||||
sb.WriteString(truncate(text, transcriptMessageCap))
|
||||
sb.WriteString("\n")
|
||||
}
|
||||
for _, tc := range m.ToolCalls {
|
||||
fmt.Fprintf(&sb, "tool_call name=%s args=%s\n", tc.Name, truncate(string(tc.Arguments), transcriptMessageCap))
|
||||
}
|
||||
for _, tr := range m.ToolResults {
|
||||
fmt.Fprintf(&sb, "tool_result name=%s body=%s\n", tr.Name, truncate(tr.Content, transcriptMessageCap))
|
||||
}
|
||||
}
|
||||
return sb.String()
|
||||
}
|
||||
|
||||
func truncate(s string, n int) string {
|
||||
if len(s) <= n {
|
||||
return s
|
||||
}
|
||||
return s[:n] + "...(truncated)"
|
||||
}
|
||||
Reference in New Issue
Block a user