fix: address verified gadfly P5/#5 findings (contrib/store concurrency)
executus CI / test (pull_request) Successful in 1m34s

All 3 cloud models converged on real concurrency bugs in the SQLite stores:

- AppendVersion (HIGH): the seq key was `SELECT MAX(seq)+1` then INSERT in two
  un-transacted statements with a NON-unique index, AND the Scan error was
  swallowed (seq stayed 0 on failure). Concurrent appends could both land the
  same seq, silently breaking newest-first ordering. Now: one transaction, the
  Scan error is propagated, the (skill_id, seq) index is UNIQUE (the loser of a
  race fails loudly), and an empty SkillID is rejected.
- MarkScheduledRun / MarkAgentScheduledRun (all 3): replaced the Get→mutate→Save
  read-modify-write (lost-update window) with a single atomic UPDATE using
  json_set, so a concurrent Mark/edit can't clobber it. json_set keeps the JSON
  blob's NextRunAt/LastScheduledRunAt consistent with the indexed column;
  RFC3339Nano matches Go's time encoding so the blob still round-trips (tested).
- Open: actually applies PRAGMA busy_timeout=5000 (the doc advertised it but it
  was never set) — a contended writer waits instead of erroring SQLITE_BUSY.
- budgetStore.Add: rejects NaN/Inf secondsUsed (would irrecoverably poison the
  column).

Triaged-but-kept: plaintext webhook secret (documented design, high-entropy URL
key, pre-existing); SQL()/free-form `where` helpers (no untrusted input reaches
them — defense-in-depth notes only).

Core go.sum still free of host/DB deps; contrib/store green (incl. a json_set
blob-round-trip test).

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
This commit is contained in:
2026-06-26 23:37:37 -04:00
parent b194a9621d
commit c8a87f1733
5 changed files with 100 additions and 16 deletions
+6
View File
@@ -5,6 +5,7 @@ import (
"database/sql" "database/sql"
"errors" "errors"
"fmt" "fmt"
"math"
"time" "time"
"gitea.stevedudenhoeffer.com/steve/executus/budget" "gitea.stevedudenhoeffer.com/steve/executus/budget"
@@ -57,6 +58,11 @@ func (s *budgetStore) Get(ctx context.Context, userID string) (*budget.SkillBudg
// Add increments usage atomically, rolling the 7-day window over inside one // Add increments usage atomically, rolling the 7-day window over inside one
// transaction so concurrent Adds can't race the read-modify-write. // transaction so concurrent Adds can't race the read-modify-write.
func (s *budgetStore) Add(ctx context.Context, userID string, secondsUsed float64, now time.Time) error { func (s *budgetStore) Add(ctx context.Context, userID string, secondsUsed float64, now time.Time) error {
// A NaN/Inf would poison the seconds_used column irrecoverably (NaN
// propagates through every later add), so reject it at the boundary.
if math.IsNaN(secondsUsed) || math.IsInf(secondsUsed, 0) {
return fmt.Errorf("budgetStore.Add: invalid secondsUsed %v", secondsUsed)
}
tx, err := s.db.BeginTx(ctx, nil) tx, err := s.db.BeginTx(ctx, nil)
if err != nil { if err != nil {
return fmt.Errorf("budgetStore.Add: begin: %w", err) return fmt.Errorf("budgetStore.Add: begin: %w", err)
+12 -5
View File
@@ -157,11 +157,18 @@ func (s *personaStore) ListScheduledAgents(ctx context.Context, dueBefore time.T
} }
func (s *personaStore) MarkAgentScheduledRun(ctx context.Context, agentID string, ranAt, nextAt time.Time) error { func (s *personaStore) MarkAgentScheduledRun(ctx context.Context, agentID string, ranAt, nextAt time.Time) error {
a, err := s.GetAgent(ctx, agentID) // Single atomic statement, not Get→mutate→Save: closes the lost-update
// window a concurrent Mark/edit would otherwise open. json_set keeps the
// blob's *time.Time fields consistent with the next_run_at column (Go
// encodes time.Time as RFC3339Nano, so it round-trips through GetAgent).
res, err := s.db.ExecContext(ctx,
`UPDATE agents SET next_run_at=?, data=json_set(data,'$.NextRunAt',?,'$.LastScheduledRunAt',?) WHERE id=?`,
nextAt.Unix(), nextAt.Format(time.RFC3339Nano), ranAt.Format(time.RFC3339Nano), agentID)
if err != nil { if err != nil {
return err return fmt.Errorf("personaStore.MarkAgentScheduledRun: %w", err)
} }
a.LastScheduledRunAt = &ranAt if n, _ := res.RowsAffected(); n == 0 {
a.NextRunAt = &nextAt return persona.ErrNotFound
return s.SaveAgent(ctx, a) }
return nil
} }
+35
View File
@@ -69,3 +69,38 @@ func TestSQLitePersonaStore(t *testing.T) {
t.Errorf("GetAgent after delete = %v, want ErrNotFound", err) t.Errorf("GetAgent after delete = %v, want ErrNotFound", err)
} }
} }
// TestMarkAgentScheduledRunBlobRoundTrips guards the json_set atomic update:
// the JSON blob must stay parseable and reflect the new scheduled times.
func TestMarkAgentScheduledRunBlobRoundTrips(t *testing.T) {
ctx := context.Background()
db, _ := Open(":memory:")
defer db.Close()
st := db.Personas()
st.InitializeAgentStorage(ctx)
start := time.Now().UTC()
a := &persona.Agent{ID: "m1", Name: "n", OwnerID: "o", Schedule: "0 * * * *"}
a.NextRunAt = &start
if err := st.SaveAgent(ctx, a); err != nil {
t.Fatal(err)
}
ran := start
next := start.Add(time.Hour)
if err := st.MarkAgentScheduledRun(ctx, "m1", ran, next); err != nil {
t.Fatal(err)
}
got, err := st.GetAgent(ctx, "m1") // blob must still unmarshal
if err != nil {
t.Fatalf("GetAgent after json_set Mark failed (blob corrupt?): %v", err)
}
if got.NextRunAt == nil || !got.NextRunAt.Equal(next) {
t.Errorf("blob NextRunAt = %v, want %v", got.NextRunAt, next)
}
if got.LastScheduledRunAt == nil || !got.LastScheduledRunAt.Equal(ran) {
t.Errorf("blob LastScheduledRunAt = %v, want %v", got.LastScheduledRunAt, ran)
}
// Unknown id -> ErrNotFound.
if err := st.MarkAgentScheduledRun(ctx, "nope", ran, next); err != persona.ErrNotFound {
t.Errorf("Mark(unknown) = %v, want ErrNotFound", err)
}
}
+40 -11
View File
@@ -45,7 +45,7 @@ CREATE TABLE IF NOT EXISTS skill_versions (
seq INTEGER NOT NULL, -- append order, for newest-first seq INTEGER NOT NULL, -- append order, for newest-first
data TEXT NOT NULL data TEXT NOT NULL
); );
CREATE INDEX IF NOT EXISTS idx_skill_versions_skill ON skill_versions(skill_id, seq);`) CREATE UNIQUE INDEX IF NOT EXISTS idx_skill_versions_skill ON skill_versions(skill_id, seq);`)
if err != nil { if err != nil {
return fmt.Errorf("skillStore.Initialize: %w", err) return fmt.Errorf("skillStore.Initialize: %w", err)
} }
@@ -182,27 +182,56 @@ func (s *skillStore) ListDueScheduled(ctx context.Context, now time.Time) ([]ski
} }
func (s *skillStore) MarkScheduledRun(ctx context.Context, skillID string, ranAt, nextAt time.Time) error { func (s *skillStore) MarkScheduledRun(ctx context.Context, skillID string, ranAt, nextAt time.Time) error {
sk, err := s.Get(ctx, skillID) // Single atomic statement instead of Get→mutate→Save: a concurrent Mark or
if err != nil { // admin edit can't lose this update (no read-modify-write window). json_set
return err // keeps the JSON blob's NextRunAt/LastScheduledRunAt consistent with the
// indexed next_run_at column; RFC3339Nano matches Go's time JSON encoding so
// the blob still round-trips through Get.
var next int64
if !nextAt.IsZero() {
next = nextAt.Unix()
} }
sk.LastScheduledRunAt = ranAt res, err := s.db.ExecContext(ctx,
sk.NextRunAt = nextAt `UPDATE skills SET next_run_at=?, data=json_set(data,'$.NextRunAt',?,'$.LastScheduledRunAt',?) WHERE id=?`,
return s.Save(ctx, sk) next, nextAt.Format(time.RFC3339Nano), ranAt.Format(time.RFC3339Nano), skillID)
if err != nil {
return fmt.Errorf("skillStore.MarkScheduledRun: %w", err)
}
if n, _ := res.RowsAffected(); n == 0 {
return skill.ErrNotFound
}
return nil
} }
func (s *skillStore) AppendVersion(ctx context.Context, sv skill.SkillVersion) error { func (s *skillStore) AppendVersion(ctx context.Context, sv skill.SkillVersion) error {
if sv.SkillID == "" {
return fmt.Errorf("skillStore.AppendVersion: skill_id is required")
}
blob, err := json.Marshal(sv) blob, err := json.Marshal(sv)
if err != nil { if err != nil {
return fmt.Errorf("skillStore.AppendVersion: marshal: %w", err) return fmt.Errorf("skillStore.AppendVersion: marshal: %w", err)
} }
// seq = current max+1 for this skill (newest-first ordering key). // seq = current max+1 for this skill (newest-first ordering key). The
// MAX-then-INSERT runs in ONE transaction and the (skill_id, seq) index is
// UNIQUE, so two concurrent appends can't both land the same seq: the loser
// fails loudly on commit instead of silently corrupting the ordering. The
// Scan error is propagated (was swallowed, leaving seq=0 on failure).
tx, err := s.db.BeginTx(ctx, nil)
if err != nil {
return fmt.Errorf("skillStore.AppendVersion: begin: %w", err)
}
defer tx.Rollback() //nolint:errcheck // no-op after Commit
var seq int64 var seq int64
_ = s.db.QueryRowContext(ctx, `SELECT COALESCE(MAX(seq),0)+1 FROM skill_versions WHERE skill_id = ?`, sv.SkillID).Scan(&seq) if err := tx.QueryRowContext(ctx, `SELECT COALESCE(MAX(seq),0)+1 FROM skill_versions WHERE skill_id = ?`, sv.SkillID).Scan(&seq); err != nil {
if _, err := s.db.ExecContext(ctx, return fmt.Errorf("skillStore.AppendVersion: seq: %w", err)
}
if _, err := tx.ExecContext(ctx,
`INSERT INTO skill_versions (id, skill_id, version, seq, data) VALUES (?, ?, ?, ?, ?)`, `INSERT INTO skill_versions (id, skill_id, version, seq, data) VALUES (?, ?, ?, ?, ?)`,
sv.ID, sv.SkillID, sv.Version, seq, string(blob)); err != nil { sv.ID, sv.SkillID, sv.Version, seq, string(blob)); err != nil {
return fmt.Errorf("skillStore.AppendVersion: %w", err) return fmt.Errorf("skillStore.AppendVersion: insert: %w", err)
}
if err := tx.Commit(); err != nil {
return fmt.Errorf("skillStore.AppendVersion: commit: %w", err)
} }
return nil return nil
} }
+7
View File
@@ -33,6 +33,13 @@ func Open(dsn string) (*DB, error) {
if err != nil { if err != nil {
return nil, fmt.Errorf("store: open %q: %w", dsn, err) return nil, fmt.Errorf("store: open %q: %w", dsn, err)
} }
// A contended writer should WAIT for the lock, not fail immediately — set a
// busy_timeout so concurrent stores don't see spurious SQLITE_BUSY. (The
// doc example advertised this; it's now actually applied for every DSN.)
if _, err := sqldb.Exec("PRAGMA busy_timeout=5000"); err != nil {
sqldb.Close()
return nil, fmt.Errorf("store: set busy_timeout %q: %w", dsn, err)
}
if err := sqldb.Ping(); err != nil { if err := sqldb.Ping(); err != nil {
sqldb.Close() sqldb.Close()
return nil, fmt.Errorf("store: ping %q: %w", dsn, err) return nil, fmt.Errorf("store: ping %q: %w", dsn, err)