// kv_set is the v4 KV-storage write tool. It upserts (scope, key) → // value within the calling skill's namespace, with optional TTL. // // Per-value cap: the constructor takes maxValueBytes (typically read // from convar `skills.storage.kv_max_value_bytes`); 0 means use the // 64 KiB default. // // Per-skill quota (sum across all rows): the constructor's QuotaProvider // arg drives the v4 Phase 4 enforcement. nil disables enforcement // (useful for tests and admin-only deployments). The check is: // // used := storage.KVUsageBytes(skill) // delta := len(new value) - len(prior value if updating same key) // if used + delta > kvMax → quota_exceeded // // We subtract the existing value's size on UPDATE so an in-place edit // of a hot key never trips the cap unless the new value is larger. package tools import ( "context" "encoding/json" "errors" "fmt" "time" "gitea.stevedudenhoeffer.com/steve/executus/tool" ) const defaultKVMaxValueBytes = 65536 // 64 KiB type kvSetArgs struct { Scope string `json:"scope" description:"Storage scope: 'skill', 'user:', 'run:', or 'root_run:' (shared across the whole dispatch tree)."` Key string `json:"key" description:"Key within the scope."` Value json.RawMessage `json:"value" description:"JSON value to store. Must parse as valid JSON (object, array, string, number, bool, or null)."` TTLSeconds *int `json:"ttl_seconds,omitempty" description:"Optional TTL in seconds. The entry expires (and is lazy-purged on read) after this duration."` } // NewKVSet constructs the kv_set tool. // // storage nil → "not configured" at execute time. // maxValueBytes <= 0 falls back to defaultKVMaxValueBytes. // quota nil → per-skill quota check is skipped (per-value cap still // applies). func NewKVSet(storage KVStorage, quota QuotaProvider, maxValueBytes int) tool.Tool { if maxValueBytes <= 0 { maxValueBytes = defaultKVMaxValueBytes } return tool.NewGatedTool[kvSetArgs]( "kv_set", "Set a value at the given scope+key. Optionally with a TTL after which the entry auto-expires.", tool.Permission{ AuthoringRequirement: tool.RequirementAnyone, OperatesOn: tool.ScopeCaller, SafeForShare: true, Categories: []string{"storage", "write"}, }, func(ctx context.Context, inv tool.Invocation, args kvSetArgs) (string, error) { if storage == nil { return "", fmt.Errorf("kv_set: not configured") } if err := ValidateScope(inv, args.Scope, false); err != nil { return "", fmt.Errorf("kv_set: %w", err) } if args.Key == "" { return "", fmt.Errorf("kv_set: key required") } if len(args.Value) == 0 { return "", fmt.Errorf("kv_set: value required") } if len(args.Value) > maxValueBytes { return "", fmt.Errorf("kv_set: value exceeds max %d bytes (got %d)", maxValueBytes, len(args.Value)) } // Validate JSON. The storage layer treats the raw bytes as // opaque, but the LLM contract says "value is a JSON value" // — surfacing a parse error here gives a friendlier message // than letting an invalid blob round-trip and confuse the // reader on a future kv_get. var probe any if err := json.Unmarshal(args.Value, &probe); err != nil { return "", fmt.Errorf("kv_set: value is not valid JSON: %w", err) } partition := kvPartition(inv, args.Scope) // Per-skill quota gate (v4 Phase 4). Skipped when quota is nil // (tests / admin opt-out) so the per-value cap above is the // only line of defence in that mode. Also skipped for the // shared root_run partition — per-skill quota attribution is // meaningless across the sentinel; the per-value cap above + // the run-scope sweeper bound that partition's growth. if quota != nil && partition == inv.SkillID { kvMax, _, err := quota.EffectiveQuota(ctx, inv.SkillID) if err != nil { return "", fmt.Errorf("kv_set: quota lookup: %w", err) } used, err := storage.KVUsageBytes(ctx, inv.SkillID) if err != nil { return "", fmt.Errorf("kv_set: usage check: %w", err) } delta := int64(len(args.Value)) // On UPDATE, subtract the prior value's size so an // in-place edit of a hot key doesn't double-count. A // brand-new key (KVGet returns ErrKVNotFound) leaves // delta untouched. if existing, getErr := storage.KVGet(ctx, inv.SkillID, args.Scope, args.Key); getErr == nil && existing != nil { delta -= int64(len(existing.Value)) } else if getErr != nil && !errors.Is(getErr, ErrKVNotFound) { return "", fmt.Errorf("kv_set: pre-write lookup: %w", getErr) } if used+delta > kvMax { return "", fmt.Errorf("kv_set: quota_exceeded — %d/%d bytes used; ask admin for higher quota", used, kvMax) } } now := time.Now() entry := KVDomainEntry{ SkillID: partition, Scope: args.Scope, Key: args.Key, Value: args.Value, CreatedAt: now, UpdatedAt: now, } if args.TTLSeconds != nil && *args.TTLSeconds > 0 { expires := now.Add(time.Duration(*args.TTLSeconds) * time.Second) entry.ExpiresAt = &expires } if err := storage.KVSet(ctx, entry); err != nil { return "", fmt.Errorf("kv_set: %w", err) } // V7 versioned KV history (admin diagnostic). Best-effort — // a failed history write must NOT shadow the successful // kv_set return, so we ignore the error after logging. // Production adapter satisfies KVHistoryRecorder; tests // using a bare KVStorage skip this branch entirely. if h, ok := storage.(KVHistoryRecorder); ok && h != nil { _ = h.RecordKVHistory(ctx, partition, args.Scope, args.Key, []byte(args.Value), inv.CallerID) } return "ok", nil }, ) }