~bigbes/lethe

d7eb706cf4910ee15b546f55d7b527d662ebf8f6 — Eugene Blikh 24 days ago ab6efef
collector: add config and state store
A internal/collector/config/config.go => internal/collector/config/config.go +166 -0
@@ 0,0 1,166 @@
// Package config loads and validates the lethe collector configuration from a
// YAML file plus LETHE_COLLECTOR_* environment overrides.
//
// Validation runs in fail-fast mode: any unknown YAML key, missing required
// field, or out-of-range value rejects the load with a culpa-wrapped error.
// The only field without a default is host — the operator must choose a stable
// identity for this machine.
package config

import (
	"errors"
	"io/fs"
	"os"
	"path/filepath"
	"strings"
	"time"

	"github.com/go-playground/validator/v10"
	"github.com/go-viper/mapstructure/v2"
	"github.com/spf13/viper"
	"go.bigb.es/auxilia/culpa"
)

// Config is the root collector configuration.
type Config struct {
	Host      string         `mapstructure:"host"       validate:"required"`
	ServerURL string         `mapstructure:"server_url" validate:"required,url"`
	StateDir  string         `mapstructure:"state_dir"`
	HTTP      HTTPConfig     `mapstructure:"http"`
	Outbox    OutboxConfig   `mapstructure:"outbox"`
	Sources   []SourceConfig `mapstructure:"sources"    validate:"required,min=1,dive"`
	Log       LogConfig      `mapstructure:"log"`
}

// HTTPConfig tunes the outbound POST client.
type HTTPConfig struct {
	Timeout  time.Duration `mapstructure:"timeout"    validate:"gt=0"`
	RetryMax int           `mapstructure:"retry_max"  validate:"gte=0"`
}

// OutboxConfig caps the local safety-net buffer.
type OutboxConfig struct {
	MaxBytes int64 `mapstructure:"max_bytes" validate:"gt=0"`
}

// SourceConfig describes one tool's source root and polling behaviour.
type SourceConfig struct {
	Tool          string        `mapstructure:"tool"            validate:"required"`
	Path          string        `mapstructure:"path"            validate:"required"`
	PollInterval  time.Duration `mapstructure:"poll_interval"   validate:"gt=0"`
	BatchMaxLines int           `mapstructure:"batch_max_lines" validate:"gt=0"`
	BatchMaxBytes int64         `mapstructure:"batch_max_bytes" validate:"gt=0"`
}

// LogConfig selects log level and formatter.
type LogConfig struct {
	Level  string `mapstructure:"level"  validate:"required,oneof=debug info warn error"`
	Format string `mapstructure:"format" validate:"required,oneof=tint json human"`
}

// Load reads YAML from path, applies env overrides, fills documented defaults,
// expands ~ in paths, and validates. Errors carry a culpa code.
func Load(path string) (*Config, error) {
	v := viper.New()
	v.SetConfigFile(path)
	v.SetEnvPrefix("LETHE_COLLECTOR")
	v.SetEnvKeyReplacer(strings.NewReplacer(".", "_"))
	v.AutomaticEnv()

	registerDefaults(v)

	if err := v.ReadInConfig(); err != nil {
		if _, ok := err.(viper.ConfigFileNotFoundError); ok {
			return nil, culpa.WithCode(culpa.Wrap(err, "config file not found"), "CONFIG_NOT_FOUND")
		}
		if isFileMissing(err) {
			return nil, culpa.WithCode(culpa.Wrap(err, "config file not found"), "CONFIG_NOT_FOUND")
		}
		return nil, culpa.WithCode(culpa.Wrap(err, "parse config"), "CONFIG_PARSE")
	}

	var cfg Config
	if err := v.UnmarshalExact(
		&cfg,
		viper.DecodeHook(mapstructure.ComposeDecodeHookFunc(
			mapstructure.StringToTimeDurationHookFunc(),
			mapstructure.StringToSliceHookFunc(","),
		)),
		func(c *mapstructure.DecoderConfig) {
			c.ErrorUnused = true
		},
	); err != nil {
		return nil, culpa.WithCode(culpa.Wrap(err, "decode config"), "CONFIG_PARSE")
	}

	applySourceDefaults(&cfg)
	expandTildes(&cfg)

	val := validator.New(validator.WithRequiredStructEnabled())
	if err := val.Struct(&cfg); err != nil {
		return nil, culpa.WithCode(culpa.Wrap(err, "validate config"), "CONFIG_VALIDATE")
	}
	return &cfg, nil
}

// MustLoad calls Load and panics on error.
func MustLoad(path string) *Config {
	cfg, err := Load(path)
	if err != nil {
		panic(err)
	}
	return cfg
}

func registerDefaults(v *viper.Viper) {
	home, _ := os.UserHomeDir()
	v.SetDefault("state_dir", filepath.Join(home, ".local/state/lethe"))
	v.SetDefault("http.timeout", 30*time.Second)
	v.SetDefault("http.retry_max", 5)
	v.SetDefault("outbox.max_bytes", int64(104857600)) // 100 MiB
	v.SetDefault("log.level", "info")
	v.SetDefault("log.format", "human")
}

func applySourceDefaults(cfg *Config) {
	for i := range cfg.Sources {
		if cfg.Sources[i].PollInterval == 0 {
			cfg.Sources[i].PollInterval = 30 * time.Second
		}
		if cfg.Sources[i].BatchMaxLines == 0 {
			cfg.Sources[i].BatchMaxLines = 500
		}
		if cfg.Sources[i].BatchMaxBytes == 0 {
			cfg.Sources[i].BatchMaxBytes = 8388608 // 8 MiB
		}
	}
}

func expandTildes(cfg *Config) {
	cfg.StateDir = expandTilde(cfg.StateDir)
	for i := range cfg.Sources {
		cfg.Sources[i].Path = expandTilde(cfg.Sources[i].Path)
	}
}

func expandTilde(path string) string {
	if path == "~" || strings.HasPrefix(path, "~/") {
		home, err := os.UserHomeDir()
		if err != nil {
			return path
		}
		if path == "~" {
			return home
		}
		return filepath.Join(home, path[2:])
	}
	return path
}

func isFileMissing(err error) bool {
	var pe *fs.PathError
	if errors.As(err, &pe) {
		return errors.Is(pe.Err, fs.ErrNotExist)
	}
	return errors.Is(err, fs.ErrNotExist)
}

A internal/collector/config/config_test.go => internal/collector/config/config_test.go +161 -0
@@ 0,0 1,161 @@
package config_test

import (
	"os"
	"path/filepath"
	"strings"
	"testing"
	"time"

	"sourcecraft.dev/bigbes/lethe/internal/collector/config"
)

const validYAML = `
host: "laptop"
server_url: "https://phoebe.tailnet.ts.net"
sources:
  - tool: "claude-code"
    path: "~/.claude/projects"
`

func writeYAML(t *testing.T, body string) string {
	t.Helper()
	dir := t.TempDir()
	path := filepath.Join(dir, "config.yaml")
	if err := os.WriteFile(path, []byte(body), 0o600); err != nil {
		t.Fatalf("write yaml: %v", err)
	}
	return path
}

func TestLoad_Valid(t *testing.T) {
	path := writeYAML(t, validYAML)
	cfg, err := config.Load(path)
	if err != nil {
		t.Fatalf("Load: %v", err)
	}
	if cfg.Host != "laptop" {
		t.Errorf("Host = %q, want laptop", cfg.Host)
	}
	if cfg.ServerURL != "https://phoebe.tailnet.ts.net" {
		t.Errorf("ServerURL = %q", cfg.ServerURL)
	}
	if len(cfg.Sources) != 1 || cfg.Sources[0].Tool != "claude-code" {
		t.Errorf("Sources = %+v", cfg.Sources)
	}
}

func TestLoad_MissingHostRejected(t *testing.T) {
	body := strings.Replace(validYAML, `host: "laptop"`, ``, 1)
	if _, err := config.Load(writeYAML(t, body)); err == nil {
		t.Fatal("expected error for missing host, got nil")
	}
}

func TestLoad_MissingServerURLRejected(t *testing.T) {
	body := strings.Replace(validYAML, `server_url: "https://phoebe.tailnet.ts.net"`, ``, 1)
	if _, err := config.Load(writeYAML(t, body)); err == nil {
		t.Fatal("expected error for missing server_url, got nil")
	}
}

func TestLoad_MissingSourcesRejected(t *testing.T) {
	body := strings.Replace(validYAML, "sources:\n  - tool: \"claude-code\"\n    path: \"~/.claude/projects\"\n", ``, 1)
	if _, err := config.Load(writeYAML(t, body)); err == nil {
		t.Fatal("expected error for missing sources, got nil")
	}
}

func TestLoad_EmptySourcesRejected(t *testing.T) {
	body := strings.Replace(validYAML, "sources:\n  - tool: \"claude-code\"\n    path: \"~/.claude/projects\"\n", "sources: []\n", 1)
	if _, err := config.Load(writeYAML(t, body)); err == nil {
		t.Fatal("expected error for empty sources, got nil")
	}
}

func TestLoad_Defaults(t *testing.T) {
	path := writeYAML(t, validYAML)
	cfg, err := config.Load(path)
	if err != nil {
		t.Fatalf("Load: %v", err)
	}
	if cfg.StateDir == "" {
		t.Error("StateDir is empty")
	}
	if cfg.HTTP.Timeout != 30*time.Second {
		t.Errorf("HTTP.Timeout = %s, want 30s", cfg.HTTP.Timeout)
	}
	if cfg.HTTP.RetryMax != 5 {
		t.Errorf("HTTP.RetryMax = %d, want 5", cfg.HTTP.RetryMax)
	}
	if cfg.Outbox.MaxBytes != 104857600 {
		t.Errorf("Outbox.MaxBytes = %d, want 104857600", cfg.Outbox.MaxBytes)
	}
	if cfg.Sources[0].PollInterval != 30*time.Second {
		t.Errorf("Sources[0].PollInterval = %s, want 30s", cfg.Sources[0].PollInterval)
	}
	if cfg.Sources[0].BatchMaxLines != 500 {
		t.Errorf("Sources[0].BatchMaxLines = %d, want 500", cfg.Sources[0].BatchMaxLines)
	}
	if cfg.Sources[0].BatchMaxBytes != 8388608 {
		t.Errorf("Sources[0].BatchMaxBytes = %d, want 8388608", cfg.Sources[0].BatchMaxBytes)
	}
	if cfg.Log.Level != "info" {
		t.Errorf("Log.Level = %q, want info", cfg.Log.Level)
	}
	if cfg.Log.Format != "human" {
		t.Errorf("Log.Format = %q, want human", cfg.Log.Format)
	}
}

func TestLoad_TildeExpansion(t *testing.T) {
	path := writeYAML(t, validYAML)
	cfg, err := config.Load(path)
	if err != nil {
		t.Fatalf("Load: %v", err)
	}
	home, _ := os.UserHomeDir()
	want := filepath.Join(home, ".claude/projects")
	if cfg.Sources[0].Path != want {
		t.Errorf("Sources[0].Path = %q, want %q", cfg.Sources[0].Path, want)
	}
	wantState := filepath.Join(home, ".local/state/lethe")
	if cfg.StateDir != wantState {
		t.Errorf("StateDir = %q, want %q", cfg.StateDir, wantState)
	}
}

func TestLoad_UnknownYAMLKeyRejected(t *testing.T) {
	body := validYAML + "totally_unknown_key: 42\n"
	if _, err := config.Load(writeYAML(t, body)); err == nil {
		t.Fatal("expected error for unknown YAML key, got nil")
	}
}

func TestLoad_InvalidServerURLRejected(t *testing.T) {
	body := strings.Replace(validYAML, `server_url: "https://phoebe.tailnet.ts.net"`, `server_url: "not a url"`, 1)
	if _, err := config.Load(writeYAML(t, body)); err == nil {
		t.Fatal("expected error for invalid server_url, got nil")
	}
}

func TestLoad_EnvOverride(t *testing.T) {
	path := writeYAML(t, validYAML)
	t.Setenv("LETHE_COLLECTOR_HOST", "desktop")
	cfg, err := config.Load(path)
	if err != nil {
		t.Fatalf("Load: %v", err)
	}
	if cfg.Host != "desktop" {
		t.Errorf("Host = %q, want desktop", cfg.Host)
	}
}

func TestMustLoad_PanicsOnError(t *testing.T) {
	defer func() {
		if r := recover(); r == nil {
			t.Fatal("expected MustLoad to panic on missing file")
		}
	}()
	config.MustLoad("/nonexistent/path/that/should/not/exist.yaml")
}

A internal/collector/state/migrations.go => internal/collector/state/migrations.go +37 -0
@@ 0,0 1,37 @@
package state

import (
	"context"

	"github.com/jmoiron/sqlx"
	"go.bigb.es/auxilia/culpa"
)

// applyMigrations runs idempotent DDL to create the ingestion_state and
// outbox tables. It is safe to call multiple times.
func applyMigrations(_ context.Context, db *sqlx.DB) error {
	stmts := []string{
		`CREATE TABLE IF NOT EXISTS ingestion_state (
			tool        TEXT NOT NULL,
			source_file TEXT NOT NULL,
			last_offset INTEGER NOT NULL DEFAULT 0,
			updated_at  INTEGER NOT NULL,
			PRIMARY KEY (tool, source_file)
		)`,
		`CREATE TABLE IF NOT EXISTS outbox (
			id          INTEGER PRIMARY KEY AUTOINCREMENT,
			tool        TEXT NOT NULL,
			host        TEXT NOT NULL,
			source_file TEXT NOT NULL,
			payload     BLOB NOT NULL,
			created_at  INTEGER NOT NULL
		)`,
		`CREATE INDEX IF NOT EXISTS outbox_created_at ON outbox(created_at)`,
	}
	for _, stmt := range stmts {
		if _, err := db.Exec(stmt); err != nil {
			return culpa.Wrap(err, "migration statement failed")
		}
	}
	return nil
}

A internal/collector/state/store.go => internal/collector/state/store.go +178 -0
@@ 0,0 1,178 @@
package state

import (
	"context"
	"database/sql"
	"errors"
	"fmt"
	"os"
	"path/filepath"
	"time"

	"github.com/jmoiron/sqlx"
	"go.bigb.es/auxilia/culpa"
	_ "modernc.org/sqlite"
)

// Store is the local SQLite persistence for offsets and outbox.
type Store struct {
	db *sqlx.DB
}

// OutboxItem is an event waiting to be POSTed to the server.
type OutboxItem struct {
	Tool       string
	Host       string
	SourceFile string
	Payload    []byte
}

// OutboxRow is an item already persisted in the outbox.
type OutboxRow struct {
	ID         int64  `db:"id"`
	Tool       string `db:"tool"`
	Host       string `db:"host"`
	SourceFile string `db:"source_file"`
	Payload    []byte `db:"payload"`
	CreatedAt  int64  `db:"created_at"`
}

// Stats summarises the current store contents.
type Stats struct {
	OutboxCount   int64
	OutboxBytes   int64
	SourceOffsets int64
}

// Open creates parent directories if needed, opens (or creates) the SQLite
// database at path, and applies idempotent migrations.
func Open(ctx context.Context, path string) (*Store, error) {
	dir := filepath.Dir(path)
	if err := os.MkdirAll(dir, 0o750); err != nil {
		return nil, culpa.WithCode(culpa.Wrap(err, "create state directory"), "STATE_MKDIR")
	}

	dsn := buildDSN(path)
	db, err := sqlx.ConnectContext(ctx, "sqlite", dsn)
	if err != nil {
		return nil, culpa.WithCode(culpa.Wrap(err, "open state database"), "STATE_OPEN")
	}

	if err := applyMigrations(ctx, db); err != nil {
		_ = db.Close()
		return nil, culpa.WithCode(culpa.Wrap(err, "apply state migrations"), "STATE_MIGRATE")
	}

	return &Store{db: db}, nil
}

// Close releases the underlying database.
func (s *Store) Close() error {
	if s.db == nil {
		return nil
	}
	return s.db.Close()
}

// DB exposes the underlying sqlx handle for tests that want to assert on
// schema directly. Production code should not use this.
func (s *Store) DB() *sqlx.DB {
	return s.db
}

// GetOffset returns the last persisted offset for (tool, sourceFile).
// A missing row returns 0.
func (s *Store) GetOffset(ctx context.Context, tool, sourceFile string) (int64, error) {
	var off int64
	err := s.db.GetContext(ctx, &off,
		`SELECT last_offset FROM ingestion_state WHERE tool = ? AND source_file = ?`,
		tool, sourceFile)
	if err != nil {
		if errors.Is(err, sql.ErrNoRows) {
			return 0, nil
		}
		return 0, culpa.Wrap(err, "get offset")
	}
	return off, nil
}

// SaveOffset upserts the offset for (tool, sourceFile).
func (s *Store) SaveOffset(ctx context.Context, tool, sourceFile string, offset int64) error {
	_, err := s.db.ExecContext(ctx, `
		INSERT INTO ingestion_state (tool, source_file, last_offset, updated_at)
		VALUES (?, ?, ?, ?)
		ON CONFLICT (tool, source_file) DO UPDATE SET
			last_offset = excluded.last_offset,
			updated_at = excluded.updated_at
	`, tool, sourceFile, offset, time.Now().Unix())
	if err != nil {
		return culpa.Wrap(err, "save offset")
	}
	return nil
}

// Enqueue inserts a new outbox item.
func (s *Store) Enqueue(ctx context.Context, item OutboxItem) error {
	_, err := s.db.ExecContext(ctx, `
		INSERT INTO outbox (tool, host, source_file, payload, created_at)
		VALUES (?, ?, ?, ?, ?)
	`, item.Tool, item.Host, item.SourceFile, item.Payload, time.Now().Unix())
	if err != nil {
		return culpa.Wrap(err, "enqueue outbox item")
	}
	return nil
}

// Oldest returns up to limit outbox rows ordered by creation time (id).
func (s *Store) Oldest(ctx context.Context, limit int) ([]OutboxRow, error) {
	var rows []OutboxRow
	err := s.db.SelectContext(ctx, &rows, `
		SELECT id, tool, host, source_file, payload, created_at
		FROM outbox
		ORDER BY id ASC
		LIMIT ?
	`, limit)
	if err != nil {
		return nil, culpa.Wrap(err, "list oldest outbox items")
	}
	return rows, nil
}

// Delete removes outbox rows by id.
func (s *Store) Delete(ctx context.Context, ids []int64) error {
	if len(ids) == 0 {
		return nil
	}
	query, args, err := sqlx.In(`DELETE FROM outbox WHERE id IN (?)`, ids)
	if err != nil {
		return culpa.Wrap(err, "build delete query")
	}
	query = s.db.Rebind(query)
	_, err = s.db.ExecContext(ctx, query, args...)
	if err != nil {
		return culpa.Wrap(err, "delete outbox items")
	}
	return nil
}

// Stats returns row counts and byte totals.
func (s *Store) Stats(ctx context.Context) (Stats, error) {
	var st Stats
	if err := s.db.GetContext(ctx, &st.OutboxCount, `SELECT COUNT(*) FROM outbox`); err != nil {
		return st, culpa.Wrap(err, "count outbox")
	}
	if err := s.db.GetContext(ctx, &st.OutboxBytes, `SELECT COALESCE(SUM(LENGTH(payload)), 0) FROM outbox`); err != nil {
		return st, culpa.Wrap(err, "sum outbox bytes")
	}
	if err := s.db.GetContext(ctx, &st.SourceOffsets, `SELECT COUNT(*) FROM ingestion_state`); err != nil {
		return st, culpa.Wrap(err, "count offsets")
	}
	return st, nil
}

func buildDSN(path string) string {
	return fmt.Sprintf(
		"%s?_pragma=journal_mode(WAL)&_pragma=busy_timeout(5000)&_pragma=foreign_keys(on)&_pragma=synchronous(NORMAL)",
		path,
	)
}

A internal/collector/state/store_test.go => internal/collector/state/store_test.go +231 -0
@@ 0,0 1,231 @@
package state_test

import (
	"context"
	"testing"

	"sourcecraft.dev/bigbes/lethe/internal/collector/state"
)

func openStore(t *testing.T) *state.Store {
	t.Helper()
	ctx := context.Background()
	dir := t.TempDir()
	s, err := state.Open(ctx, dir+"/test.db")
	if err != nil {
		t.Fatalf("Open: %v", err)
	}
	t.Cleanup(func() { _ = s.Close() })
	return s
}

func TestOpen_CreatesTables(t *testing.T) {
	ctx := context.Background()
	dir := t.TempDir()
	s, err := state.Open(ctx, dir+"/test.db")
	if err != nil {
		t.Fatalf("Open: %v", err)
	}
	defer func() { _ = s.Close() }()

	// Verify we can query schema.
	var n int
	if err := s.DB().Get(&n, `SELECT COUNT(*) FROM sqlite_master WHERE type = 'table' AND name = 'ingestion_state'`); err != nil {
		t.Fatalf("query schema: %v", err)
	}
	if n != 1 {
		t.Fatalf("expected ingestion_state table, got %d", n)
	}
	if err := s.DB().Get(&n, `SELECT COUNT(*) FROM sqlite_master WHERE type = 'table' AND name = 'outbox'`); err != nil {
		t.Fatalf("query schema: %v", err)
	}
	if n != 1 {
		t.Fatalf("expected outbox table, got %d", n)
	}
}

func TestOpen_Idempotent(t *testing.T) {
	ctx := context.Background()
	dir := t.TempDir()
	path := dir + "/test.db"
	if _, err := state.Open(ctx, path); err != nil {
		t.Fatalf("first open: %v", err)
	}
	if _, err := state.Open(ctx, path); err != nil {
		t.Fatalf("second open: %v", err)
	}
}

func TestGetOffset_MissingReturnsZero(t *testing.T) {
	s := openStore(t)
	ctx := context.Background()
	off, err := s.GetOffset(ctx, "claude-code", "/tmp/foo.jsonl")
	if err != nil {
		t.Fatalf("GetOffset: %v", err)
	}
	if off != 0 {
		t.Errorf("GetOffset = %d, want 0", off)
	}
}

func TestGetOffset_DBErrorPropagated(t *testing.T) {
	s := openStore(t)
	ctx := context.Background()
	if err := s.Close(); err != nil {
		t.Fatalf("Close: %v", err)
	}
	_, err := s.GetOffset(ctx, "claude-code", "/tmp/foo.jsonl")
	if err == nil {
		t.Fatal("expected error after closing database, got nil")
	}
}

func TestSaveOffsetAndGetOffset(t *testing.T) {
	s := openStore(t)
	ctx := context.Background()
	if err := s.SaveOffset(ctx, "claude-code", "/tmp/foo.jsonl", 1234); err != nil {
		t.Fatalf("SaveOffset: %v", err)
	}
	off, err := s.GetOffset(ctx, "claude-code", "/tmp/foo.jsonl")
	if err != nil {
		t.Fatalf("GetOffset: %v", err)
	}
	if off != 1234 {
		t.Errorf("GetOffset = %d, want 1234", off)
	}
}

func TestSaveOffset_UpdatesExisting(t *testing.T) {
	s := openStore(t)
	ctx := context.Background()
	if err := s.SaveOffset(ctx, "claude-code", "/tmp/foo.jsonl", 100); err != nil {
		t.Fatalf("SaveOffset: %v", err)
	}
	if err := s.SaveOffset(ctx, "claude-code", "/tmp/foo.jsonl", 200); err != nil {
		t.Fatalf("SaveOffset update: %v", err)
	}
	off, err := s.GetOffset(ctx, "claude-code", "/tmp/foo.jsonl")
	if err != nil {
		t.Fatalf("GetOffset: %v", err)
	}
	if off != 200 {
		t.Errorf("GetOffset = %d, want 200", off)
	}
}

func TestEnqueueAndOldest(t *testing.T) {
	s := openStore(t)
	ctx := context.Background()
	item := state.OutboxItem{
		Tool:       "claude-code",
		Host:       "laptop",
		SourceFile: "/tmp/foo.jsonl",
		Payload:    []byte(`{"test":1}`),
	}
	if err := s.Enqueue(ctx, item); err != nil {
		t.Fatalf("Enqueue: %v", err)
	}
	rows, err := s.Oldest(ctx, 10)
	if err != nil {
		t.Fatalf("Oldest: %v", err)
	}
	if len(rows) != 1 {
		t.Fatalf("expected 1 row, got %d", len(rows))
	}
	if string(rows[0].Payload) != `{"test":1}` {
		t.Errorf("Payload = %s", rows[0].Payload)
	}
}

func TestOldest_Limit(t *testing.T) {
	s := openStore(t)
	ctx := context.Background()
	for i := range 3 {
		item := state.OutboxItem{
			Tool:       "claude-code",
			Host:       "laptop",
			SourceFile: "/tmp/foo.jsonl",
			Payload:    []byte(`x`),
		}
		_ = item // avoid unused if we change loop; we need i for unique payload
		item.Payload = []byte(string(rune('a' + i)))
		if err := s.Enqueue(ctx, item); err != nil {
			t.Fatalf("Enqueue: %v", err)
		}
	}
	rows, err := s.Oldest(ctx, 2)
	if err != nil {
		t.Fatalf("Oldest: %v", err)
	}
	if len(rows) != 2 {
		t.Errorf("expected 2 rows, got %d", len(rows))
	}
}

func TestDelete(t *testing.T) {
	s := openStore(t)
	ctx := context.Background()
	item := state.OutboxItem{
		Tool:       "claude-code",
		Host:       "laptop",
		SourceFile: "/tmp/foo.jsonl",
		Payload:    []byte(`x`),
	}
	if err := s.Enqueue(ctx, item); err != nil {
		t.Fatalf("Enqueue: %v", err)
	}
	rows, _ := s.Oldest(ctx, 10)
	if len(rows) != 1 {
		t.Fatalf("expected 1 row before delete")
	}
	if err := s.Delete(ctx, []int64{rows[0].ID}); err != nil {
		t.Fatalf("Delete: %v", err)
	}
	rows, err := s.Oldest(ctx, 10)
	if err != nil {
		t.Fatalf("Oldest after delete: %v", err)
	}
	if len(rows) != 0 {
		t.Errorf("expected 0 rows after delete, got %d", len(rows))
	}
}

func TestStats(t *testing.T) {
	s := openStore(t)
	ctx := context.Background()
	st, err := s.Stats(ctx)
	if err != nil {
		t.Fatalf("Stats: %v", err)
	}
	if st.OutboxCount != 0 {
		t.Errorf("OutboxCount = %d, want 0", st.OutboxCount)
	}
	if st.OutboxBytes != 0 {
		t.Errorf("OutboxBytes = %d, want 0", st.OutboxBytes)
	}
	if st.SourceOffsets != 0 {
		t.Errorf("SourceOffsets = %d, want 0", st.SourceOffsets)
	}

	// Add one offset and one outbox item.
	if err := s.SaveOffset(ctx, "cc", "/a.jsonl", 42); err != nil {
		t.Fatalf("SaveOffset: %v", err)
	}
	if err := s.Enqueue(ctx, state.OutboxItem{Tool: "cc", Host: "h", SourceFile: "/a.jsonl", Payload: []byte("hello")}); err != nil {
		t.Fatalf("Enqueue: %v", err)
	}

	st, err = s.Stats(ctx)
	if err != nil {
		t.Fatalf("Stats: %v", err)
	}
	if st.OutboxCount != 1 {
		t.Errorf("OutboxCount = %d, want 1", st.OutboxCount)
	}
	if st.OutboxBytes != 5 {
		t.Errorf("OutboxBytes = %d, want 5", st.OutboxBytes)
	}
	if st.SourceOffsets != 1 {
		t.Errorf("SourceOffsets = %d, want 1", st.SourceOffsets)
	}
}