From d7eb706cf4910ee15b546f55d7b527d662ebf8f6 Mon Sep 17 00:00:00 2001 From: Eugene Blikh Date: Sun, 3 May 2026 17:34:35 +0300 Subject: [PATCH] collector: add config and state store --- internal/collector/config/config.go | 166 ++++++++++++++++ internal/collector/config/config_test.go | 161 ++++++++++++++++ internal/collector/state/migrations.go | 37 ++++ internal/collector/state/store.go | 178 +++++++++++++++++ internal/collector/state/store_test.go | 231 +++++++++++++++++++++++ 5 files changed, 773 insertions(+) create mode 100644 internal/collector/config/config.go create mode 100644 internal/collector/config/config_test.go create mode 100644 internal/collector/state/migrations.go create mode 100644 internal/collector/state/store.go create mode 100644 internal/collector/state/store_test.go diff --git a/internal/collector/config/config.go b/internal/collector/config/config.go new file mode 100644 index 0000000000000000000000000000000000000000..3c0f449a2bcdf3983a270838a463a029b89e48a8 --- /dev/null +++ b/internal/collector/config/config.go @@ -0,0 +1,166 @@ +// Package config loads and validates the lethe collector configuration from a +// YAML file plus LETHE_COLLECTOR_* environment overrides. +// +// Validation runs in fail-fast mode: any unknown YAML key, missing required +// field, or out-of-range value rejects the load with a culpa-wrapped error. +// The only field without a default is host — the operator must choose a stable +// identity for this machine. +package config + +import ( + "errors" + "io/fs" + "os" + "path/filepath" + "strings" + "time" + + "github.com/go-playground/validator/v10" + "github.com/go-viper/mapstructure/v2" + "github.com/spf13/viper" + "go.bigb.es/auxilia/culpa" +) + +// Config is the root collector configuration. +type Config struct { + Host string `mapstructure:"host" validate:"required"` + ServerURL string `mapstructure:"server_url" validate:"required,url"` + StateDir string `mapstructure:"state_dir"` + HTTP HTTPConfig `mapstructure:"http"` + Outbox OutboxConfig `mapstructure:"outbox"` + Sources []SourceConfig `mapstructure:"sources" validate:"required,min=1,dive"` + Log LogConfig `mapstructure:"log"` +} + +// HTTPConfig tunes the outbound POST client. +type HTTPConfig struct { + Timeout time.Duration `mapstructure:"timeout" validate:"gt=0"` + RetryMax int `mapstructure:"retry_max" validate:"gte=0"` +} + +// OutboxConfig caps the local safety-net buffer. +type OutboxConfig struct { + MaxBytes int64 `mapstructure:"max_bytes" validate:"gt=0"` +} + +// SourceConfig describes one tool's source root and polling behaviour. +type SourceConfig struct { + Tool string `mapstructure:"tool" validate:"required"` + Path string `mapstructure:"path" validate:"required"` + PollInterval time.Duration `mapstructure:"poll_interval" validate:"gt=0"` + BatchMaxLines int `mapstructure:"batch_max_lines" validate:"gt=0"` + BatchMaxBytes int64 `mapstructure:"batch_max_bytes" validate:"gt=0"` +} + +// LogConfig selects log level and formatter. +type LogConfig struct { + Level string `mapstructure:"level" validate:"required,oneof=debug info warn error"` + Format string `mapstructure:"format" validate:"required,oneof=tint json human"` +} + +// Load reads YAML from path, applies env overrides, fills documented defaults, +// expands ~ in paths, and validates. Errors carry a culpa code. +func Load(path string) (*Config, error) { + v := viper.New() + v.SetConfigFile(path) + v.SetEnvPrefix("LETHE_COLLECTOR") + v.SetEnvKeyReplacer(strings.NewReplacer(".", "_")) + v.AutomaticEnv() + + registerDefaults(v) + + if err := v.ReadInConfig(); err != nil { + if _, ok := err.(viper.ConfigFileNotFoundError); ok { + return nil, culpa.WithCode(culpa.Wrap(err, "config file not found"), "CONFIG_NOT_FOUND") + } + if isFileMissing(err) { + return nil, culpa.WithCode(culpa.Wrap(err, "config file not found"), "CONFIG_NOT_FOUND") + } + return nil, culpa.WithCode(culpa.Wrap(err, "parse config"), "CONFIG_PARSE") + } + + var cfg Config + if err := v.UnmarshalExact( + &cfg, + viper.DecodeHook(mapstructure.ComposeDecodeHookFunc( + mapstructure.StringToTimeDurationHookFunc(), + mapstructure.StringToSliceHookFunc(","), + )), + func(c *mapstructure.DecoderConfig) { + c.ErrorUnused = true + }, + ); err != nil { + return nil, culpa.WithCode(culpa.Wrap(err, "decode config"), "CONFIG_PARSE") + } + + applySourceDefaults(&cfg) + expandTildes(&cfg) + + val := validator.New(validator.WithRequiredStructEnabled()) + if err := val.Struct(&cfg); err != nil { + return nil, culpa.WithCode(culpa.Wrap(err, "validate config"), "CONFIG_VALIDATE") + } + return &cfg, nil +} + +// MustLoad calls Load and panics on error. +func MustLoad(path string) *Config { + cfg, err := Load(path) + if err != nil { + panic(err) + } + return cfg +} + +func registerDefaults(v *viper.Viper) { + home, _ := os.UserHomeDir() + v.SetDefault("state_dir", filepath.Join(home, ".local/state/lethe")) + v.SetDefault("http.timeout", 30*time.Second) + v.SetDefault("http.retry_max", 5) + v.SetDefault("outbox.max_bytes", int64(104857600)) // 100 MiB + v.SetDefault("log.level", "info") + v.SetDefault("log.format", "human") +} + +func applySourceDefaults(cfg *Config) { + for i := range cfg.Sources { + if cfg.Sources[i].PollInterval == 0 { + cfg.Sources[i].PollInterval = 30 * time.Second + } + if cfg.Sources[i].BatchMaxLines == 0 { + cfg.Sources[i].BatchMaxLines = 500 + } + if cfg.Sources[i].BatchMaxBytes == 0 { + cfg.Sources[i].BatchMaxBytes = 8388608 // 8 MiB + } + } +} + +func expandTildes(cfg *Config) { + cfg.StateDir = expandTilde(cfg.StateDir) + for i := range cfg.Sources { + cfg.Sources[i].Path = expandTilde(cfg.Sources[i].Path) + } +} + +func expandTilde(path string) string { + if path == "~" || strings.HasPrefix(path, "~/") { + home, err := os.UserHomeDir() + if err != nil { + return path + } + if path == "~" { + return home + } + return filepath.Join(home, path[2:]) + } + return path +} + +func isFileMissing(err error) bool { + var pe *fs.PathError + if errors.As(err, &pe) { + return errors.Is(pe.Err, fs.ErrNotExist) + } + return errors.Is(err, fs.ErrNotExist) +} diff --git a/internal/collector/config/config_test.go b/internal/collector/config/config_test.go new file mode 100644 index 0000000000000000000000000000000000000000..194197e37f7016205738f6c917539c81bddab80d --- /dev/null +++ b/internal/collector/config/config_test.go @@ -0,0 +1,161 @@ +package config_test + +import ( + "os" + "path/filepath" + "strings" + "testing" + "time" + + "sourcecraft.dev/bigbes/lethe/internal/collector/config" +) + +const validYAML = ` +host: "laptop" +server_url: "https://phoebe.tailnet.ts.net" +sources: + - tool: "claude-code" + path: "~/.claude/projects" +` + +func writeYAML(t *testing.T, body string) string { + t.Helper() + dir := t.TempDir() + path := filepath.Join(dir, "config.yaml") + if err := os.WriteFile(path, []byte(body), 0o600); err != nil { + t.Fatalf("write yaml: %v", err) + } + return path +} + +func TestLoad_Valid(t *testing.T) { + path := writeYAML(t, validYAML) + cfg, err := config.Load(path) + if err != nil { + t.Fatalf("Load: %v", err) + } + if cfg.Host != "laptop" { + t.Errorf("Host = %q, want laptop", cfg.Host) + } + if cfg.ServerURL != "https://phoebe.tailnet.ts.net" { + t.Errorf("ServerURL = %q", cfg.ServerURL) + } + if len(cfg.Sources) != 1 || cfg.Sources[0].Tool != "claude-code" { + t.Errorf("Sources = %+v", cfg.Sources) + } +} + +func TestLoad_MissingHostRejected(t *testing.T) { + body := strings.Replace(validYAML, `host: "laptop"`, ``, 1) + if _, err := config.Load(writeYAML(t, body)); err == nil { + t.Fatal("expected error for missing host, got nil") + } +} + +func TestLoad_MissingServerURLRejected(t *testing.T) { + body := strings.Replace(validYAML, `server_url: "https://phoebe.tailnet.ts.net"`, ``, 1) + if _, err := config.Load(writeYAML(t, body)); err == nil { + t.Fatal("expected error for missing server_url, got nil") + } +} + +func TestLoad_MissingSourcesRejected(t *testing.T) { + body := strings.Replace(validYAML, "sources:\n - tool: \"claude-code\"\n path: \"~/.claude/projects\"\n", ``, 1) + if _, err := config.Load(writeYAML(t, body)); err == nil { + t.Fatal("expected error for missing sources, got nil") + } +} + +func TestLoad_EmptySourcesRejected(t *testing.T) { + body := strings.Replace(validYAML, "sources:\n - tool: \"claude-code\"\n path: \"~/.claude/projects\"\n", "sources: []\n", 1) + if _, err := config.Load(writeYAML(t, body)); err == nil { + t.Fatal("expected error for empty sources, got nil") + } +} + +func TestLoad_Defaults(t *testing.T) { + path := writeYAML(t, validYAML) + cfg, err := config.Load(path) + if err != nil { + t.Fatalf("Load: %v", err) + } + if cfg.StateDir == "" { + t.Error("StateDir is empty") + } + if cfg.HTTP.Timeout != 30*time.Second { + t.Errorf("HTTP.Timeout = %s, want 30s", cfg.HTTP.Timeout) + } + if cfg.HTTP.RetryMax != 5 { + t.Errorf("HTTP.RetryMax = %d, want 5", cfg.HTTP.RetryMax) + } + if cfg.Outbox.MaxBytes != 104857600 { + t.Errorf("Outbox.MaxBytes = %d, want 104857600", cfg.Outbox.MaxBytes) + } + if cfg.Sources[0].PollInterval != 30*time.Second { + t.Errorf("Sources[0].PollInterval = %s, want 30s", cfg.Sources[0].PollInterval) + } + if cfg.Sources[0].BatchMaxLines != 500 { + t.Errorf("Sources[0].BatchMaxLines = %d, want 500", cfg.Sources[0].BatchMaxLines) + } + if cfg.Sources[0].BatchMaxBytes != 8388608 { + t.Errorf("Sources[0].BatchMaxBytes = %d, want 8388608", cfg.Sources[0].BatchMaxBytes) + } + if cfg.Log.Level != "info" { + t.Errorf("Log.Level = %q, want info", cfg.Log.Level) + } + if cfg.Log.Format != "human" { + t.Errorf("Log.Format = %q, want human", cfg.Log.Format) + } +} + +func TestLoad_TildeExpansion(t *testing.T) { + path := writeYAML(t, validYAML) + cfg, err := config.Load(path) + if err != nil { + t.Fatalf("Load: %v", err) + } + home, _ := os.UserHomeDir() + want := filepath.Join(home, ".claude/projects") + if cfg.Sources[0].Path != want { + t.Errorf("Sources[0].Path = %q, want %q", cfg.Sources[0].Path, want) + } + wantState := filepath.Join(home, ".local/state/lethe") + if cfg.StateDir != wantState { + t.Errorf("StateDir = %q, want %q", cfg.StateDir, wantState) + } +} + +func TestLoad_UnknownYAMLKeyRejected(t *testing.T) { + body := validYAML + "totally_unknown_key: 42\n" + if _, err := config.Load(writeYAML(t, body)); err == nil { + t.Fatal("expected error for unknown YAML key, got nil") + } +} + +func TestLoad_InvalidServerURLRejected(t *testing.T) { + body := strings.Replace(validYAML, `server_url: "https://phoebe.tailnet.ts.net"`, `server_url: "not a url"`, 1) + if _, err := config.Load(writeYAML(t, body)); err == nil { + t.Fatal("expected error for invalid server_url, got nil") + } +} + +func TestLoad_EnvOverride(t *testing.T) { + path := writeYAML(t, validYAML) + t.Setenv("LETHE_COLLECTOR_HOST", "desktop") + cfg, err := config.Load(path) + if err != nil { + t.Fatalf("Load: %v", err) + } + if cfg.Host != "desktop" { + t.Errorf("Host = %q, want desktop", cfg.Host) + } +} + +func TestMustLoad_PanicsOnError(t *testing.T) { + defer func() { + if r := recover(); r == nil { + t.Fatal("expected MustLoad to panic on missing file") + } + }() + config.MustLoad("/nonexistent/path/that/should/not/exist.yaml") +} diff --git a/internal/collector/state/migrations.go b/internal/collector/state/migrations.go new file mode 100644 index 0000000000000000000000000000000000000000..4dbae50fe3ddb543e2051c20afb982ff0ef53d96 --- /dev/null +++ b/internal/collector/state/migrations.go @@ -0,0 +1,37 @@ +package state + +import ( + "context" + + "github.com/jmoiron/sqlx" + "go.bigb.es/auxilia/culpa" +) + +// applyMigrations runs idempotent DDL to create the ingestion_state and +// outbox tables. It is safe to call multiple times. +func applyMigrations(_ context.Context, db *sqlx.DB) error { + stmts := []string{ + `CREATE TABLE IF NOT EXISTS ingestion_state ( + tool TEXT NOT NULL, + source_file TEXT NOT NULL, + last_offset INTEGER NOT NULL DEFAULT 0, + updated_at INTEGER NOT NULL, + PRIMARY KEY (tool, source_file) + )`, + `CREATE TABLE IF NOT EXISTS outbox ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + tool TEXT NOT NULL, + host TEXT NOT NULL, + source_file TEXT NOT NULL, + payload BLOB NOT NULL, + created_at INTEGER NOT NULL + )`, + `CREATE INDEX IF NOT EXISTS outbox_created_at ON outbox(created_at)`, + } + for _, stmt := range stmts { + if _, err := db.Exec(stmt); err != nil { + return culpa.Wrap(err, "migration statement failed") + } + } + return nil +} diff --git a/internal/collector/state/store.go b/internal/collector/state/store.go new file mode 100644 index 0000000000000000000000000000000000000000..9768de47208b2aaec61cda03f12420636cd80fad --- /dev/null +++ b/internal/collector/state/store.go @@ -0,0 +1,178 @@ +package state + +import ( + "context" + "database/sql" + "errors" + "fmt" + "os" + "path/filepath" + "time" + + "github.com/jmoiron/sqlx" + "go.bigb.es/auxilia/culpa" + _ "modernc.org/sqlite" +) + +// Store is the local SQLite persistence for offsets and outbox. +type Store struct { + db *sqlx.DB +} + +// OutboxItem is an event waiting to be POSTed to the server. +type OutboxItem struct { + Tool string + Host string + SourceFile string + Payload []byte +} + +// OutboxRow is an item already persisted in the outbox. +type OutboxRow struct { + ID int64 `db:"id"` + Tool string `db:"tool"` + Host string `db:"host"` + SourceFile string `db:"source_file"` + Payload []byte `db:"payload"` + CreatedAt int64 `db:"created_at"` +} + +// Stats summarises the current store contents. +type Stats struct { + OutboxCount int64 + OutboxBytes int64 + SourceOffsets int64 +} + +// Open creates parent directories if needed, opens (or creates) the SQLite +// database at path, and applies idempotent migrations. +func Open(ctx context.Context, path string) (*Store, error) { + dir := filepath.Dir(path) + if err := os.MkdirAll(dir, 0o750); err != nil { + return nil, culpa.WithCode(culpa.Wrap(err, "create state directory"), "STATE_MKDIR") + } + + dsn := buildDSN(path) + db, err := sqlx.ConnectContext(ctx, "sqlite", dsn) + if err != nil { + return nil, culpa.WithCode(culpa.Wrap(err, "open state database"), "STATE_OPEN") + } + + if err := applyMigrations(ctx, db); err != nil { + _ = db.Close() + return nil, culpa.WithCode(culpa.Wrap(err, "apply state migrations"), "STATE_MIGRATE") + } + + return &Store{db: db}, nil +} + +// Close releases the underlying database. +func (s *Store) Close() error { + if s.db == nil { + return nil + } + return s.db.Close() +} + +// DB exposes the underlying sqlx handle for tests that want to assert on +// schema directly. Production code should not use this. +func (s *Store) DB() *sqlx.DB { + return s.db +} + +// GetOffset returns the last persisted offset for (tool, sourceFile). +// A missing row returns 0. +func (s *Store) GetOffset(ctx context.Context, tool, sourceFile string) (int64, error) { + var off int64 + err := s.db.GetContext(ctx, &off, + `SELECT last_offset FROM ingestion_state WHERE tool = ? AND source_file = ?`, + tool, sourceFile) + if err != nil { + if errors.Is(err, sql.ErrNoRows) { + return 0, nil + } + return 0, culpa.Wrap(err, "get offset") + } + return off, nil +} + +// SaveOffset upserts the offset for (tool, sourceFile). +func (s *Store) SaveOffset(ctx context.Context, tool, sourceFile string, offset int64) error { + _, err := s.db.ExecContext(ctx, ` + INSERT INTO ingestion_state (tool, source_file, last_offset, updated_at) + VALUES (?, ?, ?, ?) + ON CONFLICT (tool, source_file) DO UPDATE SET + last_offset = excluded.last_offset, + updated_at = excluded.updated_at + `, tool, sourceFile, offset, time.Now().Unix()) + if err != nil { + return culpa.Wrap(err, "save offset") + } + return nil +} + +// Enqueue inserts a new outbox item. +func (s *Store) Enqueue(ctx context.Context, item OutboxItem) error { + _, err := s.db.ExecContext(ctx, ` + INSERT INTO outbox (tool, host, source_file, payload, created_at) + VALUES (?, ?, ?, ?, ?) + `, item.Tool, item.Host, item.SourceFile, item.Payload, time.Now().Unix()) + if err != nil { + return culpa.Wrap(err, "enqueue outbox item") + } + return nil +} + +// Oldest returns up to limit outbox rows ordered by creation time (id). +func (s *Store) Oldest(ctx context.Context, limit int) ([]OutboxRow, error) { + var rows []OutboxRow + err := s.db.SelectContext(ctx, &rows, ` + SELECT id, tool, host, source_file, payload, created_at + FROM outbox + ORDER BY id ASC + LIMIT ? + `, limit) + if err != nil { + return nil, culpa.Wrap(err, "list oldest outbox items") + } + return rows, nil +} + +// Delete removes outbox rows by id. +func (s *Store) Delete(ctx context.Context, ids []int64) error { + if len(ids) == 0 { + return nil + } + query, args, err := sqlx.In(`DELETE FROM outbox WHERE id IN (?)`, ids) + if err != nil { + return culpa.Wrap(err, "build delete query") + } + query = s.db.Rebind(query) + _, err = s.db.ExecContext(ctx, query, args...) + if err != nil { + return culpa.Wrap(err, "delete outbox items") + } + return nil +} + +// Stats returns row counts and byte totals. +func (s *Store) Stats(ctx context.Context) (Stats, error) { + var st Stats + if err := s.db.GetContext(ctx, &st.OutboxCount, `SELECT COUNT(*) FROM outbox`); err != nil { + return st, culpa.Wrap(err, "count outbox") + } + if err := s.db.GetContext(ctx, &st.OutboxBytes, `SELECT COALESCE(SUM(LENGTH(payload)), 0) FROM outbox`); err != nil { + return st, culpa.Wrap(err, "sum outbox bytes") + } + if err := s.db.GetContext(ctx, &st.SourceOffsets, `SELECT COUNT(*) FROM ingestion_state`); err != nil { + return st, culpa.Wrap(err, "count offsets") + } + return st, nil +} + +func buildDSN(path string) string { + return fmt.Sprintf( + "%s?_pragma=journal_mode(WAL)&_pragma=busy_timeout(5000)&_pragma=foreign_keys(on)&_pragma=synchronous(NORMAL)", + path, + ) +} diff --git a/internal/collector/state/store_test.go b/internal/collector/state/store_test.go new file mode 100644 index 0000000000000000000000000000000000000000..a21d40ffec6270b40a9905a1260532b08c9e27ac --- /dev/null +++ b/internal/collector/state/store_test.go @@ -0,0 +1,231 @@ +package state_test + +import ( + "context" + "testing" + + "sourcecraft.dev/bigbes/lethe/internal/collector/state" +) + +func openStore(t *testing.T) *state.Store { + t.Helper() + ctx := context.Background() + dir := t.TempDir() + s, err := state.Open(ctx, dir+"/test.db") + if err != nil { + t.Fatalf("Open: %v", err) + } + t.Cleanup(func() { _ = s.Close() }) + return s +} + +func TestOpen_CreatesTables(t *testing.T) { + ctx := context.Background() + dir := t.TempDir() + s, err := state.Open(ctx, dir+"/test.db") + if err != nil { + t.Fatalf("Open: %v", err) + } + defer func() { _ = s.Close() }() + + // Verify we can query schema. + var n int + if err := s.DB().Get(&n, `SELECT COUNT(*) FROM sqlite_master WHERE type = 'table' AND name = 'ingestion_state'`); err != nil { + t.Fatalf("query schema: %v", err) + } + if n != 1 { + t.Fatalf("expected ingestion_state table, got %d", n) + } + if err := s.DB().Get(&n, `SELECT COUNT(*) FROM sqlite_master WHERE type = 'table' AND name = 'outbox'`); err != nil { + t.Fatalf("query schema: %v", err) + } + if n != 1 { + t.Fatalf("expected outbox table, got %d", n) + } +} + +func TestOpen_Idempotent(t *testing.T) { + ctx := context.Background() + dir := t.TempDir() + path := dir + "/test.db" + if _, err := state.Open(ctx, path); err != nil { + t.Fatalf("first open: %v", err) + } + if _, err := state.Open(ctx, path); err != nil { + t.Fatalf("second open: %v", err) + } +} + +func TestGetOffset_MissingReturnsZero(t *testing.T) { + s := openStore(t) + ctx := context.Background() + off, err := s.GetOffset(ctx, "claude-code", "/tmp/foo.jsonl") + if err != nil { + t.Fatalf("GetOffset: %v", err) + } + if off != 0 { + t.Errorf("GetOffset = %d, want 0", off) + } +} + +func TestGetOffset_DBErrorPropagated(t *testing.T) { + s := openStore(t) + ctx := context.Background() + if err := s.Close(); err != nil { + t.Fatalf("Close: %v", err) + } + _, err := s.GetOffset(ctx, "claude-code", "/tmp/foo.jsonl") + if err == nil { + t.Fatal("expected error after closing database, got nil") + } +} + +func TestSaveOffsetAndGetOffset(t *testing.T) { + s := openStore(t) + ctx := context.Background() + if err := s.SaveOffset(ctx, "claude-code", "/tmp/foo.jsonl", 1234); err != nil { + t.Fatalf("SaveOffset: %v", err) + } + off, err := s.GetOffset(ctx, "claude-code", "/tmp/foo.jsonl") + if err != nil { + t.Fatalf("GetOffset: %v", err) + } + if off != 1234 { + t.Errorf("GetOffset = %d, want 1234", off) + } +} + +func TestSaveOffset_UpdatesExisting(t *testing.T) { + s := openStore(t) + ctx := context.Background() + if err := s.SaveOffset(ctx, "claude-code", "/tmp/foo.jsonl", 100); err != nil { + t.Fatalf("SaveOffset: %v", err) + } + if err := s.SaveOffset(ctx, "claude-code", "/tmp/foo.jsonl", 200); err != nil { + t.Fatalf("SaveOffset update: %v", err) + } + off, err := s.GetOffset(ctx, "claude-code", "/tmp/foo.jsonl") + if err != nil { + t.Fatalf("GetOffset: %v", err) + } + if off != 200 { + t.Errorf("GetOffset = %d, want 200", off) + } +} + +func TestEnqueueAndOldest(t *testing.T) { + s := openStore(t) + ctx := context.Background() + item := state.OutboxItem{ + Tool: "claude-code", + Host: "laptop", + SourceFile: "/tmp/foo.jsonl", + Payload: []byte(`{"test":1}`), + } + if err := s.Enqueue(ctx, item); err != nil { + t.Fatalf("Enqueue: %v", err) + } + rows, err := s.Oldest(ctx, 10) + if err != nil { + t.Fatalf("Oldest: %v", err) + } + if len(rows) != 1 { + t.Fatalf("expected 1 row, got %d", len(rows)) + } + if string(rows[0].Payload) != `{"test":1}` { + t.Errorf("Payload = %s", rows[0].Payload) + } +} + +func TestOldest_Limit(t *testing.T) { + s := openStore(t) + ctx := context.Background() + for i := range 3 { + item := state.OutboxItem{ + Tool: "claude-code", + Host: "laptop", + SourceFile: "/tmp/foo.jsonl", + Payload: []byte(`x`), + } + _ = item // avoid unused if we change loop; we need i for unique payload + item.Payload = []byte(string(rune('a' + i))) + if err := s.Enqueue(ctx, item); err != nil { + t.Fatalf("Enqueue: %v", err) + } + } + rows, err := s.Oldest(ctx, 2) + if err != nil { + t.Fatalf("Oldest: %v", err) + } + if len(rows) != 2 { + t.Errorf("expected 2 rows, got %d", len(rows)) + } +} + +func TestDelete(t *testing.T) { + s := openStore(t) + ctx := context.Background() + item := state.OutboxItem{ + Tool: "claude-code", + Host: "laptop", + SourceFile: "/tmp/foo.jsonl", + Payload: []byte(`x`), + } + if err := s.Enqueue(ctx, item); err != nil { + t.Fatalf("Enqueue: %v", err) + } + rows, _ := s.Oldest(ctx, 10) + if len(rows) != 1 { + t.Fatalf("expected 1 row before delete") + } + if err := s.Delete(ctx, []int64{rows[0].ID}); err != nil { + t.Fatalf("Delete: %v", err) + } + rows, err := s.Oldest(ctx, 10) + if err != nil { + t.Fatalf("Oldest after delete: %v", err) + } + if len(rows) != 0 { + t.Errorf("expected 0 rows after delete, got %d", len(rows)) + } +} + +func TestStats(t *testing.T) { + s := openStore(t) + ctx := context.Background() + st, err := s.Stats(ctx) + if err != nil { + t.Fatalf("Stats: %v", err) + } + if st.OutboxCount != 0 { + t.Errorf("OutboxCount = %d, want 0", st.OutboxCount) + } + if st.OutboxBytes != 0 { + t.Errorf("OutboxBytes = %d, want 0", st.OutboxBytes) + } + if st.SourceOffsets != 0 { + t.Errorf("SourceOffsets = %d, want 0", st.SourceOffsets) + } + + // Add one offset and one outbox item. + if err := s.SaveOffset(ctx, "cc", "/a.jsonl", 42); err != nil { + t.Fatalf("SaveOffset: %v", err) + } + if err := s.Enqueue(ctx, state.OutboxItem{Tool: "cc", Host: "h", SourceFile: "/a.jsonl", Payload: []byte("hello")}); err != nil { + t.Fatalf("Enqueue: %v", err) + } + + st, err = s.Stats(ctx) + if err != nil { + t.Fatalf("Stats: %v", err) + } + if st.OutboxCount != 1 { + t.Errorf("OutboxCount = %d, want 1", st.OutboxCount) + } + if st.OutboxBytes != 5 { + t.Errorf("OutboxBytes = %d, want 5", st.OutboxBytes) + } + if st.SourceOffsets != 1 { + t.Errorf("SourceOffsets = %d, want 1", st.SourceOffsets) + } +}