Files
executus/skillpack/sync.go
steve 9bb5d143f7
executus CI / test (pull_request) Successful in 3m30s
fix(skillpack): address review — symlink read, git arg-injection, dup-subscribe, nil panics
Real issues from the PR review:
- security: readTree now skips symlinks (a pack with SKILL.md -> /etc/passwd
  or scripts/x -> ../../.ssh/id_rsa could read host files); covers file and
  dir symlinks, incl. within a git subpath
- security: GitSource rejects url/ref beginning with '-' (git arg injection)
  and clones with '--' separator; --filter=blob:none (blobless partial clone)
  instead of full-history clone
- correctness: Subscribe no longer swallows a non-ErrNotFound store error from
  GetByName (would create a duplicate subscription); handles *GitSource as well
  as GitSource in the URL/subpath extraction
- correctness: pinTo no longer renames a subscription, so Apply can't silently
  collide two subscriptions when an upstream pack changes its name
- validation: isKebab rejects leading/trailing/consecutive hyphens; BOM-
  prefixed SKILL.md now parses (matches the doc comment)
- robustness: Catalog/Activate/renderPackBody/Stage guard nil/malformed packs
- test cleanup: Syncer.Store field renamed Cache (collided with the Store
  interface); test NewID returns distinct ids
- tests: symlink-skip, BOM, strict-kebab, nil-pack-safety

Deferred (advisory perf, documented): PackCache stores raw trees so activation
re-parses; CheckAll is serial. Both fine at expected scale.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
2026-07-04 20:41:44 -04:00

204 lines
6.4 KiB
Go

package skillpack
import (
"context"
"errors"
"fmt"
"time"
"github.com/google/uuid"
)
// Syncer ties a Store, a PackCache, and Sources together into the subscription
// lifecycle: subscribe (initial pin), check (record a PENDING update, never move
// the pin), and apply (the explicit re-pin). It owns the supply-chain invariant
// — the only call that changes the bytes an agent runs is Apply, always with an
// actor recorded.
type Syncer struct {
Cache PackCache // content store for pinned trees
Subs Store // subscription metadata store
// SourceFor builds the Source for a stored subscription. A host overrides
// this to enforce its allow-list (reject a disallowed URL/kind before any
// fetch). Nil uses DefaultSourceFor (dir + git, no allow-list).
SourceFor func(*Subscription) (Source, error)
// Now/NewID are injectable for deterministic tests.
Now func() time.Time
NewID func() string
}
func (y *Syncer) now() time.Time {
if y.Now != nil {
return y.Now()
}
return time.Now()
}
func (y *Syncer) newID() string {
if y.NewID != nil {
return y.NewID()
}
return uuid.NewString()
}
func (y *Syncer) sourceFor(s *Subscription) (Source, error) {
if y.SourceFor != nil {
return y.SourceFor(s)
}
return DefaultSourceFor(s)
}
// DefaultSourceFor reconstructs a Source from a subscription's stored
// coordinates, with no allow-list. A host that cares about which sources are
// permitted should set Syncer.SourceFor instead of using this.
func DefaultSourceFor(s *Subscription) (Source, error) {
switch s.SourceKind {
case "dir":
return DirSource{Path: s.SourceURL}, nil
case "git":
return GitSource{URL: s.SourceURL, Subpath: s.Subpath}, nil
default:
return nil, fmt.Errorf("skillpack: unknown source kind %q", s.SourceKind)
}
}
// fetchPack fetches src at ref, caches the resulting tree, and returns the
// parsed pack plus the source's resolved ref.
func (y *Syncer) fetchPack(ctx context.Context, src Source, ref string) (*Pack, string, error) {
tree, sourceRef, err := src.Fetch(ctx, ref)
if err != nil {
return nil, "", err
}
pack, err := LoadPack(tree)
if err != nil {
return nil, "", err
}
if err := y.Cache.Put(ctx, pack.Digest, pack.Tree); err != nil {
return nil, "", err
}
return pack, sourceRef, nil
}
// Subscribe fetches a pack from src at trackRef, caches it, and persists a new
// Subscription pinned to that exact content, attributed to by. It rejects a
// second subscription to the same pack name.
func (y *Syncer) Subscribe(ctx context.Context, src Source, trackRef, by string) (*Subscription, error) {
pack, sourceRef, err := y.fetchPack(ctx, src, trackRef)
if err != nil {
return nil, err
}
existing, err := y.Subs.GetByName(ctx, pack.Manifest.Name)
if err == nil {
return nil, fmt.Errorf("skillpack: already subscribed to %q (id %s)", pack.Manifest.Name, existing.ID)
}
if !errors.Is(err, ErrNotFound) {
// A transient store error must NOT fall through to creating a row — that
// would produce a duplicate subscription the uniqueness check missed.
return nil, fmt.Errorf("skillpack: checking for existing subscription %q: %w", pack.Manifest.Name, err)
}
sub := &Subscription{
ID: y.newID(),
Name: pack.Manifest.Name,
SourceKind: src.Kind(),
SourceURL: src.String(),
TrackRef: trackRef,
Enabled: true,
}
// Store the raw URL + subpath separately (String() may combine them for
// display). GitSource methods have value receivers, so a caller may pass
// either GitSource or *GitSource — handle both.
switch gs := src.(type) {
case GitSource:
sub.SourceURL, sub.Subpath = gs.URL, gs.Subpath
case *GitSource:
sub.SourceURL, sub.Subpath = gs.URL, gs.Subpath
}
sub.pinTo(pack, sourceRef, by, y.now())
if err := y.Subs.Save(ctx, sub); err != nil {
return nil, err
}
return sub, nil
}
// Check fetches the subscription's tracked ref and, if the content digest
// differs from the current pin, caches the new tree and records it as PENDING —
// it never moves the pin. If the tracked ref matches the pin, any stale pending
// state is cleared. The updated subscription is saved and returned.
func (y *Syncer) Check(ctx context.Context, id string) (*Subscription, error) {
sub, err := y.Subs.Get(ctx, id)
if err != nil {
return nil, err
}
src, err := y.sourceFor(sub)
if err != nil {
return nil, err
}
pack, sourceRef, err := y.fetchPack(ctx, src, sub.TrackRef)
if err != nil {
return nil, err
}
if pack.Digest == sub.PinnedDigest {
// No change upstream; drop any previously-recorded pending update.
sub.PendingDigest, sub.PendingSourceRef, sub.PendingAt = "", "", time.Time{}
} else {
sub.PendingDigest = pack.Digest
sub.PendingSourceRef = sourceRef
sub.PendingAt = y.now()
}
if err := y.Subs.Save(ctx, sub); err != nil {
return nil, err
}
return sub, nil
}
// CheckAll runs Check on every subscription and returns the ones that now have a
// pending update. Errors on individual subscriptions are collected, not fatal —
// one unreachable source shouldn't stop the sweep. A host calls this on its own
// ticker (skillpack has no cron opinion; the update is never auto-applied so the
// cadence only affects how fresh the "pending" signal is).
func (y *Syncer) CheckAll(ctx context.Context) (pending []Subscription, errs []error) {
subs, err := y.Subs.List(ctx)
if err != nil {
return nil, []error{err}
}
for i := range subs {
updated, err := y.Check(ctx, subs[i].ID)
if err != nil {
errs = append(errs, fmt.Errorf("skillpack: check %q: %w", subs[i].Name, err))
continue
}
if updated.HasPending() {
pending = append(pending, *updated)
}
}
return pending, errs
}
// Apply promotes a subscription's pending update to the active pin, attributed
// to by. This is the ONLY call that changes what an agent runs. It errors if
// there is no pending update or the pending tree is missing from the cache.
func (y *Syncer) Apply(ctx context.Context, id, by string) (*Subscription, error) {
sub, err := y.Subs.Get(ctx, id)
if err != nil {
return nil, err
}
if !sub.HasPending() {
return nil, fmt.Errorf("skillpack: %q has no pending update to apply", sub.Name)
}
tree, err := y.Cache.Get(ctx, sub.PendingDigest)
if err != nil {
return nil, fmt.Errorf("skillpack: pending tree for %q missing from cache: %w", sub.Name, err)
}
pack, err := LoadPack(tree)
if err != nil {
return nil, err
}
sub.pinTo(pack, sub.PendingSourceRef, by, y.now())
if err := y.Subs.Save(ctx, sub); err != nil {
return nil, err
}
return sub, nil
}