From c8a87f173370c4b783c27627edc76425986ae91c Mon Sep 17 00:00:00 2001 From: Steve Dudenhoeffer Date: Fri, 26 Jun 2026 23:37:37 -0400 Subject: [PATCH] fix: address verified gadfly P5/#5 findings (contrib/store concurrency) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit All 3 cloud models converged on real concurrency bugs in the SQLite stores: - AppendVersion (HIGH): the seq key was `SELECT MAX(seq)+1` then INSERT in two un-transacted statements with a NON-unique index, AND the Scan error was swallowed (seq stayed 0 on failure). Concurrent appends could both land the same seq, silently breaking newest-first ordering. Now: one transaction, the Scan error is propagated, the (skill_id, seq) index is UNIQUE (the loser of a race fails loudly), and an empty SkillID is rejected. - MarkScheduledRun / MarkAgentScheduledRun (all 3): replaced the Get→mutate→Save read-modify-write (lost-update window) with a single atomic UPDATE using json_set, so a concurrent Mark/edit can't clobber it. json_set keeps the JSON blob's NextRunAt/LastScheduledRunAt consistent with the indexed column; RFC3339Nano matches Go's time encoding so the blob still round-trips (tested). - Open: actually applies PRAGMA busy_timeout=5000 (the doc advertised it but it was never set) — a contended writer waits instead of erroring SQLITE_BUSY. - budgetStore.Add: rejects NaN/Inf secondsUsed (would irrecoverably poison the column). Triaged-but-kept: plaintext webhook secret (documented design, high-entropy URL key, pre-existing); SQL()/free-form `where` helpers (no untrusted input reaches them — defense-in-depth notes only). Core go.sum still free of host/DB deps; contrib/store green (incl. a json_set blob-round-trip test). Co-Authored-By: Claude Opus 4.8 (1M context) --- contrib/store/budget_store.go | 6 ++++ contrib/store/persona_store.go | 17 +++++++--- contrib/store/persona_store_test.go | 35 ++++++++++++++++++++ contrib/store/skill_store.go | 51 ++++++++++++++++++++++------- contrib/store/sqlite.go | 7 ++++ 5 files changed, 100 insertions(+), 16 deletions(-) diff --git a/contrib/store/budget_store.go b/contrib/store/budget_store.go index 57dd85b..be2fbb7 100644 --- a/contrib/store/budget_store.go +++ b/contrib/store/budget_store.go @@ -5,6 +5,7 @@ import ( "database/sql" "errors" "fmt" + "math" "time" "gitea.stevedudenhoeffer.com/steve/executus/budget" @@ -57,6 +58,11 @@ func (s *budgetStore) Get(ctx context.Context, userID string) (*budget.SkillBudg // Add increments usage atomically, rolling the 7-day window over inside one // transaction so concurrent Adds can't race the read-modify-write. func (s *budgetStore) Add(ctx context.Context, userID string, secondsUsed float64, now time.Time) error { + // A NaN/Inf would poison the seconds_used column irrecoverably (NaN + // propagates through every later add), so reject it at the boundary. + if math.IsNaN(secondsUsed) || math.IsInf(secondsUsed, 0) { + return fmt.Errorf("budgetStore.Add: invalid secondsUsed %v", secondsUsed) + } tx, err := s.db.BeginTx(ctx, nil) if err != nil { return fmt.Errorf("budgetStore.Add: begin: %w", err) diff --git a/contrib/store/persona_store.go b/contrib/store/persona_store.go index ef96df8..74187b1 100644 --- a/contrib/store/persona_store.go +++ b/contrib/store/persona_store.go @@ -157,11 +157,18 @@ func (s *personaStore) ListScheduledAgents(ctx context.Context, dueBefore time.T } func (s *personaStore) MarkAgentScheduledRun(ctx context.Context, agentID string, ranAt, nextAt time.Time) error { - a, err := s.GetAgent(ctx, agentID) + // Single atomic statement, not Get→mutate→Save: closes the lost-update + // window a concurrent Mark/edit would otherwise open. json_set keeps the + // blob's *time.Time fields consistent with the next_run_at column (Go + // encodes time.Time as RFC3339Nano, so it round-trips through GetAgent). + res, err := s.db.ExecContext(ctx, + `UPDATE agents SET next_run_at=?, data=json_set(data,'$.NextRunAt',?,'$.LastScheduledRunAt',?) WHERE id=?`, + nextAt.Unix(), nextAt.Format(time.RFC3339Nano), ranAt.Format(time.RFC3339Nano), agentID) if err != nil { - return err + return fmt.Errorf("personaStore.MarkAgentScheduledRun: %w", err) } - a.LastScheduledRunAt = &ranAt - a.NextRunAt = &nextAt - return s.SaveAgent(ctx, a) + if n, _ := res.RowsAffected(); n == 0 { + return persona.ErrNotFound + } + return nil } diff --git a/contrib/store/persona_store_test.go b/contrib/store/persona_store_test.go index 0c8ccbd..e04fa0e 100644 --- a/contrib/store/persona_store_test.go +++ b/contrib/store/persona_store_test.go @@ -69,3 +69,38 @@ func TestSQLitePersonaStore(t *testing.T) { t.Errorf("GetAgent after delete = %v, want ErrNotFound", err) } } + +// TestMarkAgentScheduledRunBlobRoundTrips guards the json_set atomic update: +// the JSON blob must stay parseable and reflect the new scheduled times. +func TestMarkAgentScheduledRunBlobRoundTrips(t *testing.T) { + ctx := context.Background() + db, _ := Open(":memory:") + defer db.Close() + st := db.Personas() + st.InitializeAgentStorage(ctx) + start := time.Now().UTC() + a := &persona.Agent{ID: "m1", Name: "n", OwnerID: "o", Schedule: "0 * * * *"} + a.NextRunAt = &start + if err := st.SaveAgent(ctx, a); err != nil { + t.Fatal(err) + } + ran := start + next := start.Add(time.Hour) + if err := st.MarkAgentScheduledRun(ctx, "m1", ran, next); err != nil { + t.Fatal(err) + } + got, err := st.GetAgent(ctx, "m1") // blob must still unmarshal + if err != nil { + t.Fatalf("GetAgent after json_set Mark failed (blob corrupt?): %v", err) + } + if got.NextRunAt == nil || !got.NextRunAt.Equal(next) { + t.Errorf("blob NextRunAt = %v, want %v", got.NextRunAt, next) + } + if got.LastScheduledRunAt == nil || !got.LastScheduledRunAt.Equal(ran) { + t.Errorf("blob LastScheduledRunAt = %v, want %v", got.LastScheduledRunAt, ran) + } + // Unknown id -> ErrNotFound. + if err := st.MarkAgentScheduledRun(ctx, "nope", ran, next); err != persona.ErrNotFound { + t.Errorf("Mark(unknown) = %v, want ErrNotFound", err) + } +} diff --git a/contrib/store/skill_store.go b/contrib/store/skill_store.go index f7ea2c6..0933edd 100644 --- a/contrib/store/skill_store.go +++ b/contrib/store/skill_store.go @@ -45,7 +45,7 @@ CREATE TABLE IF NOT EXISTS skill_versions ( seq INTEGER NOT NULL, -- append order, for newest-first data TEXT NOT NULL ); -CREATE INDEX IF NOT EXISTS idx_skill_versions_skill ON skill_versions(skill_id, seq);`) +CREATE UNIQUE INDEX IF NOT EXISTS idx_skill_versions_skill ON skill_versions(skill_id, seq);`) if err != nil { return fmt.Errorf("skillStore.Initialize: %w", err) } @@ -182,27 +182,56 @@ func (s *skillStore) ListDueScheduled(ctx context.Context, now time.Time) ([]ski } func (s *skillStore) MarkScheduledRun(ctx context.Context, skillID string, ranAt, nextAt time.Time) error { - sk, err := s.Get(ctx, skillID) - if err != nil { - return err + // Single atomic statement instead of Get→mutate→Save: a concurrent Mark or + // admin edit can't lose this update (no read-modify-write window). json_set + // keeps the JSON blob's NextRunAt/LastScheduledRunAt consistent with the + // indexed next_run_at column; RFC3339Nano matches Go's time JSON encoding so + // the blob still round-trips through Get. + var next int64 + if !nextAt.IsZero() { + next = nextAt.Unix() } - sk.LastScheduledRunAt = ranAt - sk.NextRunAt = nextAt - return s.Save(ctx, sk) + res, err := s.db.ExecContext(ctx, + `UPDATE skills SET next_run_at=?, data=json_set(data,'$.NextRunAt',?,'$.LastScheduledRunAt',?) WHERE id=?`, + next, nextAt.Format(time.RFC3339Nano), ranAt.Format(time.RFC3339Nano), skillID) + if err != nil { + return fmt.Errorf("skillStore.MarkScheduledRun: %w", err) + } + if n, _ := res.RowsAffected(); n == 0 { + return skill.ErrNotFound + } + return nil } func (s *skillStore) AppendVersion(ctx context.Context, sv skill.SkillVersion) error { + if sv.SkillID == "" { + return fmt.Errorf("skillStore.AppendVersion: skill_id is required") + } blob, err := json.Marshal(sv) if err != nil { return fmt.Errorf("skillStore.AppendVersion: marshal: %w", err) } - // seq = current max+1 for this skill (newest-first ordering key). + // seq = current max+1 for this skill (newest-first ordering key). The + // MAX-then-INSERT runs in ONE transaction and the (skill_id, seq) index is + // UNIQUE, so two concurrent appends can't both land the same seq: the loser + // fails loudly on commit instead of silently corrupting the ordering. The + // Scan error is propagated (was swallowed, leaving seq=0 on failure). + tx, err := s.db.BeginTx(ctx, nil) + if err != nil { + return fmt.Errorf("skillStore.AppendVersion: begin: %w", err) + } + defer tx.Rollback() //nolint:errcheck // no-op after Commit var seq int64 - _ = s.db.QueryRowContext(ctx, `SELECT COALESCE(MAX(seq),0)+1 FROM skill_versions WHERE skill_id = ?`, sv.SkillID).Scan(&seq) - if _, err := s.db.ExecContext(ctx, + if err := tx.QueryRowContext(ctx, `SELECT COALESCE(MAX(seq),0)+1 FROM skill_versions WHERE skill_id = ?`, sv.SkillID).Scan(&seq); err != nil { + return fmt.Errorf("skillStore.AppendVersion: seq: %w", err) + } + if _, err := tx.ExecContext(ctx, `INSERT INTO skill_versions (id, skill_id, version, seq, data) VALUES (?, ?, ?, ?, ?)`, sv.ID, sv.SkillID, sv.Version, seq, string(blob)); err != nil { - return fmt.Errorf("skillStore.AppendVersion: %w", err) + return fmt.Errorf("skillStore.AppendVersion: insert: %w", err) + } + if err := tx.Commit(); err != nil { + return fmt.Errorf("skillStore.AppendVersion: commit: %w", err) } return nil } diff --git a/contrib/store/sqlite.go b/contrib/store/sqlite.go index 3fd8893..e8548e6 100644 --- a/contrib/store/sqlite.go +++ b/contrib/store/sqlite.go @@ -33,6 +33,13 @@ func Open(dsn string) (*DB, error) { if err != nil { return nil, fmt.Errorf("store: open %q: %w", dsn, err) } + // A contended writer should WAIT for the lock, not fail immediately — set a + // busy_timeout so concurrent stores don't see spurious SQLITE_BUSY. (The + // doc example advertised this; it's now actually applied for every DSN.) + if _, err := sqldb.Exec("PRAGMA busy_timeout=5000"); err != nil { + sqldb.Close() + return nil, fmt.Errorf("store: set busy_timeout %q: %w", dsn, err) + } if err := sqldb.Ping(); err != nil { sqldb.Close() return nil, fmt.Errorf("store: ping %q: %w", dsn, err)