package skillpack import ( "context" "errors" "fmt" "time" "github.com/google/uuid" ) // Syncer ties a Store, a PackCache, and Sources together into the subscription // lifecycle: subscribe (initial pin), check (record a PENDING update, never move // the pin), and apply (the explicit re-pin). It owns the supply-chain invariant // — the only call that changes the bytes an agent runs is Apply, always with an // actor recorded. type Syncer struct { Cache PackCache // content store for pinned trees Subs Store // subscription metadata store // SourceFor builds the Source for a stored subscription. A host overrides // this to enforce its allow-list (reject a disallowed URL/kind before any // fetch). Nil uses DefaultSourceFor (dir + git, no allow-list). SourceFor func(*Subscription) (Source, error) // Now/NewID are injectable for deterministic tests. Now func() time.Time NewID func() string } func (y *Syncer) now() time.Time { if y.Now != nil { return y.Now() } return time.Now() } func (y *Syncer) newID() string { if y.NewID != nil { return y.NewID() } return uuid.NewString() } func (y *Syncer) sourceFor(s *Subscription) (Source, error) { if y.SourceFor != nil { return y.SourceFor(s) } return DefaultSourceFor(s) } // DefaultSourceFor reconstructs a Source from a subscription's stored // coordinates, with no allow-list. A host that cares about which sources are // permitted should set Syncer.SourceFor instead of using this. func DefaultSourceFor(s *Subscription) (Source, error) { switch s.SourceKind { case "dir": return DirSource{Path: s.SourceURL}, nil case "git": return GitSource{URL: s.SourceURL, Subpath: s.Subpath}, nil default: return nil, fmt.Errorf("skillpack: unknown source kind %q", s.SourceKind) } } // fetchPack fetches src at ref, caches the resulting tree, and returns the // parsed pack plus the source's resolved ref. func (y *Syncer) fetchPack(ctx context.Context, src Source, ref string) (*Pack, string, error) { tree, sourceRef, err := src.Fetch(ctx, ref) if err != nil { return nil, "", err } pack, err := LoadPack(tree) if err != nil { return nil, "", err } if err := y.Cache.Put(ctx, pack.Digest, pack.Tree); err != nil { return nil, "", err } return pack, sourceRef, nil } // Subscribe fetches a pack from src at trackRef, caches it, and persists a new // Subscription pinned to that exact content, attributed to by. It rejects a // second subscription to the same pack name. func (y *Syncer) Subscribe(ctx context.Context, src Source, trackRef, by string) (*Subscription, error) { pack, sourceRef, err := y.fetchPack(ctx, src, trackRef) if err != nil { return nil, err } existing, err := y.Subs.GetByName(ctx, pack.Manifest.Name) if err == nil { return nil, fmt.Errorf("skillpack: already subscribed to %q (id %s)", pack.Manifest.Name, existing.ID) } if !errors.Is(err, ErrNotFound) { // A transient store error must NOT fall through to creating a row — that // would produce a duplicate subscription the uniqueness check missed. return nil, fmt.Errorf("skillpack: checking for existing subscription %q: %w", pack.Manifest.Name, err) } sub := &Subscription{ ID: y.newID(), Name: pack.Manifest.Name, SourceKind: src.Kind(), SourceURL: src.String(), TrackRef: trackRef, Enabled: true, } // Store the raw URL + subpath separately (String() may combine them for // display). GitSource methods have value receivers, so a caller may pass // either GitSource or *GitSource — handle both. switch gs := src.(type) { case GitSource: sub.SourceURL, sub.Subpath = gs.URL, gs.Subpath case *GitSource: sub.SourceURL, sub.Subpath = gs.URL, gs.Subpath } sub.pinTo(pack, sourceRef, by, y.now()) if err := y.Subs.Save(ctx, sub); err != nil { return nil, err } return sub, nil } // Check fetches the subscription's tracked ref and, if the content digest // differs from the current pin, caches the new tree and records it as PENDING — // it never moves the pin. If the tracked ref matches the pin, any stale pending // state is cleared. The updated subscription is saved and returned. func (y *Syncer) Check(ctx context.Context, id string) (*Subscription, error) { sub, err := y.Subs.Get(ctx, id) if err != nil { return nil, err } src, err := y.sourceFor(sub) if err != nil { return nil, err } pack, sourceRef, err := y.fetchPack(ctx, src, sub.TrackRef) if err != nil { return nil, err } if pack.Digest == sub.PinnedDigest { // No change upstream; drop any previously-recorded pending update. sub.PendingDigest, sub.PendingSourceRef, sub.PendingAt = "", "", time.Time{} } else { sub.PendingDigest = pack.Digest sub.PendingSourceRef = sourceRef sub.PendingAt = y.now() } if err := y.Subs.Save(ctx, sub); err != nil { return nil, err } return sub, nil } // CheckAll runs Check on every subscription and returns the ones that now have a // pending update. Errors on individual subscriptions are collected, not fatal — // one unreachable source shouldn't stop the sweep. A host calls this on its own // ticker (skillpack has no cron opinion; the update is never auto-applied so the // cadence only affects how fresh the "pending" signal is). func (y *Syncer) CheckAll(ctx context.Context) (pending []Subscription, errs []error) { subs, err := y.Subs.List(ctx) if err != nil { return nil, []error{err} } for i := range subs { updated, err := y.Check(ctx, subs[i].ID) if err != nil { errs = append(errs, fmt.Errorf("skillpack: check %q: %w", subs[i].Name, err)) continue } if updated.HasPending() { pending = append(pending, *updated) } } return pending, errs } // Apply promotes a subscription's pending update to the active pin, attributed // to by. This is the ONLY call that changes what an agent runs. It errors if // there is no pending update or the pending tree is missing from the cache. func (y *Syncer) Apply(ctx context.Context, id, by string) (*Subscription, error) { sub, err := y.Subs.Get(ctx, id) if err != nil { return nil, err } if !sub.HasPending() { return nil, fmt.Errorf("skillpack: %q has no pending update to apply", sub.Name) } tree, err := y.Cache.Get(ctx, sub.PendingDigest) if err != nil { return nil, fmt.Errorf("skillpack: pending tree for %q missing from cache: %w", sub.Name, err) } pack, err := LoadPack(tree) if err != nil { return nil, err } sub.pinTo(pack, sub.PendingSourceRef, by, y.now()) if err := y.Subs.Save(ctx, sub); err != nil { return nil, err } return sub, nil }