package state import ( "context" "database/sql" "errors" "fmt" "os" "path/filepath" "time" "github.com/jmoiron/sqlx" "go.bigb.es/auxilia/culpa" _ "modernc.org/sqlite" ) // Store is the local SQLite persistence for offsets and outbox. type Store struct { db *sqlx.DB } // OutboxItem is an event waiting to be POSTed to the server. type OutboxItem struct { Tool string Host string SourceFile string Payload []byte } // OutboxRow is an item already persisted in the outbox. type OutboxRow struct { ID int64 `db:"id"` Tool string `db:"tool"` Host string `db:"host"` SourceFile string `db:"source_file"` Payload []byte `db:"payload"` CreatedAt int64 `db:"created_at"` } // Stats summarises the current store contents. type Stats struct { OutboxCount int64 OutboxBytes int64 SourceOffsets int64 } // Open creates parent directories if needed, opens (or creates) the SQLite // database at path, and applies idempotent migrations. func Open(ctx context.Context, path string) (*Store, error) { dir := filepath.Dir(path) if err := os.MkdirAll(dir, 0o750); err != nil { return nil, culpa.WithCode(culpa.Wrap(err, "create state directory"), "STATE_MKDIR") } dsn := buildDSN(path) db, err := sqlx.ConnectContext(ctx, "sqlite", dsn) if err != nil { return nil, culpa.WithCode(culpa.Wrap(err, "open state database"), "STATE_OPEN") } if err := applyMigrations(ctx, db); err != nil { _ = db.Close() return nil, culpa.WithCode(culpa.Wrap(err, "apply state migrations"), "STATE_MIGRATE") } return &Store{db: db}, nil } // Close releases the underlying database. func (s *Store) Close() error { if s.db == nil { return nil } return s.db.Close() } // DB exposes the underlying sqlx handle for tests that want to assert on // schema directly. Production code should not use this. func (s *Store) DB() *sqlx.DB { return s.db } // GetOffset returns the last persisted offset for (tool, sourceFile). // A missing row returns 0. func (s *Store) GetOffset(ctx context.Context, tool, sourceFile string) (int64, error) { var off int64 err := s.db.GetContext(ctx, &off, `SELECT last_offset FROM ingestion_state WHERE tool = ? AND source_file = ?`, tool, sourceFile) if err != nil { if errors.Is(err, sql.ErrNoRows) { return 0, nil } return 0, culpa.Wrap(err, "get offset") } return off, nil } // SaveOffset upserts the offset for (tool, sourceFile). func (s *Store) SaveOffset(ctx context.Context, tool, sourceFile string, offset int64) error { _, err := s.db.ExecContext(ctx, ` INSERT INTO ingestion_state (tool, source_file, last_offset, updated_at) VALUES (?, ?, ?, ?) ON CONFLICT (tool, source_file) DO UPDATE SET last_offset = excluded.last_offset, updated_at = excluded.updated_at `, tool, sourceFile, offset, time.Now().Unix()) if err != nil { return culpa.Wrap(err, "save offset") } return nil } // Enqueue inserts a new outbox item. func (s *Store) Enqueue(ctx context.Context, item OutboxItem) error { _, err := s.db.ExecContext(ctx, ` INSERT INTO outbox (tool, host, source_file, payload, created_at) VALUES (?, ?, ?, ?, ?) `, item.Tool, item.Host, item.SourceFile, item.Payload, time.Now().Unix()) if err != nil { return culpa.Wrap(err, "enqueue outbox item") } return nil } // Oldest returns up to limit outbox rows ordered by creation time (id). func (s *Store) Oldest(ctx context.Context, limit int) ([]OutboxRow, error) { var rows []OutboxRow err := s.db.SelectContext(ctx, &rows, ` SELECT id, tool, host, source_file, payload, created_at FROM outbox ORDER BY id ASC LIMIT ? `, limit) if err != nil { return nil, culpa.Wrap(err, "list oldest outbox items") } return rows, nil } // Delete removes outbox rows by id. func (s *Store) Delete(ctx context.Context, ids []int64) error { if len(ids) == 0 { return nil } query, args, err := sqlx.In(`DELETE FROM outbox WHERE id IN (?)`, ids) if err != nil { return culpa.Wrap(err, "build delete query") } query = s.db.Rebind(query) _, err = s.db.ExecContext(ctx, query, args...) if err != nil { return culpa.Wrap(err, "delete outbox items") } return nil } // Stats returns row counts and byte totals. func (s *Store) Stats(ctx context.Context) (Stats, error) { var st Stats if err := s.db.GetContext(ctx, &st.OutboxCount, `SELECT COUNT(*) FROM outbox`); err != nil { return st, culpa.Wrap(err, "count outbox") } if err := s.db.GetContext(ctx, &st.OutboxBytes, `SELECT COALESCE(SUM(LENGTH(payload)), 0) FROM outbox`); err != nil { return st, culpa.Wrap(err, "sum outbox bytes") } if err := s.db.GetContext(ctx, &st.SourceOffsets, `SELECT COUNT(*) FROM ingestion_state`); err != nil { return st, culpa.Wrap(err, "count offsets") } return st, nil } func buildDSN(path string) string { return fmt.Sprintf( "%s?_pragma=journal_mode(WAL)&_pragma=busy_timeout(5000)&_pragma=foreign_keys(on)&_pragma=synchronous(NORMAL)", path, ) }