package state
import (
"context"
"database/sql"
"errors"
"fmt"
"os"
"path/filepath"
"time"
"github.com/jmoiron/sqlx"
"go.bigb.es/auxilia/culpa"
_ "modernc.org/sqlite"
)
// Store is the local SQLite persistence for offsets and outbox.
type Store struct {
db *sqlx.DB
}
// OutboxItem is an event waiting to be POSTed to the server.
type OutboxItem struct {
Tool string
Host string
SourceFile string
Payload []byte
}
// OutboxRow is an item already persisted in the outbox.
type OutboxRow struct {
ID int64 `db:"id"`
Tool string `db:"tool"`
Host string `db:"host"`
SourceFile string `db:"source_file"`
Payload []byte `db:"payload"`
CreatedAt int64 `db:"created_at"`
}
// Stats summarises the current store contents.
type Stats struct {
OutboxCount int64
OutboxBytes int64
SourceOffsets int64
}
// Open creates parent directories if needed, opens (or creates) the SQLite
// database at path, and applies idempotent migrations.
func Open(ctx context.Context, path string) (*Store, error) {
dir := filepath.Dir(path)
if err := os.MkdirAll(dir, 0o750); err != nil {
return nil, culpa.WithCode(culpa.Wrap(err, "create state directory"), "STATE_MKDIR")
}
dsn := buildDSN(path)
db, err := sqlx.ConnectContext(ctx, "sqlite", dsn)
if err != nil {
return nil, culpa.WithCode(culpa.Wrap(err, "open state database"), "STATE_OPEN")
}
if err := applyMigrations(ctx, db); err != nil {
_ = db.Close()
return nil, culpa.WithCode(culpa.Wrap(err, "apply state migrations"), "STATE_MIGRATE")
}
return &Store{db: db}, nil
}
// Close releases the underlying database.
func (s *Store) Close() error {
if s.db == nil {
return nil
}
return s.db.Close()
}
// DB exposes the underlying sqlx handle for tests that want to assert on
// schema directly. Production code should not use this.
func (s *Store) DB() *sqlx.DB {
return s.db
}
// GetOffset returns the last persisted offset for (tool, sourceFile).
// A missing row returns 0.
func (s *Store) GetOffset(ctx context.Context, tool, sourceFile string) (int64, error) {
var off int64
err := s.db.GetContext(ctx, &off,
`SELECT last_offset FROM ingestion_state WHERE tool = ? AND source_file = ?`,
tool, sourceFile)
if err != nil {
if errors.Is(err, sql.ErrNoRows) {
return 0, nil
}
return 0, culpa.Wrap(err, "get offset")
}
return off, nil
}
// SaveOffset upserts the offset for (tool, sourceFile).
func (s *Store) SaveOffset(ctx context.Context, tool, sourceFile string, offset int64) error {
_, err := s.db.ExecContext(ctx, `
INSERT INTO ingestion_state (tool, source_file, last_offset, updated_at)
VALUES (?, ?, ?, ?)
ON CONFLICT (tool, source_file) DO UPDATE SET
last_offset = excluded.last_offset,
updated_at = excluded.updated_at
`, tool, sourceFile, offset, time.Now().Unix())
if err != nil {
return culpa.Wrap(err, "save offset")
}
return nil
}
// Enqueue inserts a new outbox item.
func (s *Store) Enqueue(ctx context.Context, item OutboxItem) error {
_, err := s.db.ExecContext(ctx, `
INSERT INTO outbox (tool, host, source_file, payload, created_at)
VALUES (?, ?, ?, ?, ?)
`, item.Tool, item.Host, item.SourceFile, item.Payload, time.Now().Unix())
if err != nil {
return culpa.Wrap(err, "enqueue outbox item")
}
return nil
}
// Oldest returns up to limit outbox rows ordered by creation time (id).
func (s *Store) Oldest(ctx context.Context, limit int) ([]OutboxRow, error) {
var rows []OutboxRow
err := s.db.SelectContext(ctx, &rows, `
SELECT id, tool, host, source_file, payload, created_at
FROM outbox
ORDER BY id ASC
LIMIT ?
`, limit)
if err != nil {
return nil, culpa.Wrap(err, "list oldest outbox items")
}
return rows, nil
}
// Delete removes outbox rows by id.
func (s *Store) Delete(ctx context.Context, ids []int64) error {
if len(ids) == 0 {
return nil
}
query, args, err := sqlx.In(`DELETE FROM outbox WHERE id IN (?)`, ids)
if err != nil {
return culpa.Wrap(err, "build delete query")
}
query = s.db.Rebind(query)
_, err = s.db.ExecContext(ctx, query, args...)
if err != nil {
return culpa.Wrap(err, "delete outbox items")
}
return nil
}
// Stats returns row counts and byte totals.
func (s *Store) Stats(ctx context.Context) (Stats, error) {
var st Stats
if err := s.db.GetContext(ctx, &st.OutboxCount, `SELECT COUNT(*) FROM outbox`); err != nil {
return st, culpa.Wrap(err, "count outbox")
}
if err := s.db.GetContext(ctx, &st.OutboxBytes, `SELECT COALESCE(SUM(LENGTH(payload)), 0) FROM outbox`); err != nil {
return st, culpa.Wrap(err, "sum outbox bytes")
}
if err := s.db.GetContext(ctx, &st.SourceOffsets, `SELECT COUNT(*) FROM ingestion_state`); err != nil {
return st, culpa.Wrap(err, "count offsets")
}
return st, nil
}
func buildDSN(path string) string {
return fmt.Sprintf(
"%s?_pragma=journal_mode(WAL)&_pragma=busy_timeout(5000)&_pragma=foreign_keys(on)&_pragma=synchronous(NORMAL)",
path,
)
}